Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
9506921
T6 M0: Technical plan + analysis notebook for multi-objective vector …
Feb 9, 2026
3b2a0b2
T6 M0: Apply Xavier's review fixes (paths, dates, motivation, real LL…
Feb 10, 2026
249bde6
T6 M0: Apply Xavier's review fixes to technical plan
Feb 10, 2026
2213a19
T6 M1: Multi-objective vector scores — ObjectiveConfig, objectives.py…
Feb 12, 2026
4590102
T6 M1: Fix Colab install cell for Python 3.12 compatibility
Feb 12, 2026
3b8d2ed
T6 M1: Fix scalar objective computation, document config=None fallbac…
Feb 12, 2026
7401ca2
T6 M1: Apply Ching-An review - to_score_dict rename, configurable sca…
Feb 16, 2026
e2a66de
T6 M2 prep: align tech plan per Xavier review, add Allen's multi_obje…
Feb 18, 2026
270a1b6
T6 M2: Multi-objective support for BeamsearchAlgorithm + PrioritySear…
Feb 19, 2026
a888b93
T6 M2: Add validation notebook with convex function multi-objective d…
Feb 19, 2026
bdd5260
fix: define _repo_root in Colab setup cell for notebook
Feb 19, 2026
94a1731
fix: reset Colab working directory before clone in setup cell
Feb 19, 2026
a47cdc1
fix: improve notebook charts - more training steps, proper loss evalu…
Feb 19, 2026
0a79496
fix: move DATASET definition to imports cell for reliability
Feb 19, 2026
ca1349b
fix: RewardGuide get_feedback() now uses deepcopy - prevents env hori…
Feb 19, 2026
427cb3f
fix: Beamsearch train() returns final_validation_score instead of har…
Feb 19, 2026
106bd11
T6 M2: BBEH notebook with dotenv + OpenAI direct support
Feb 20, 2026
d16ef41
fix: BBEH notebook Colab setup cell
Feb 20, 2026
5c1b7e3
fix: BBEH notebook measures end-to-end graph time (LLM + exec) not ju…
Feb 20, 2026
b8f5023
fix: BBEH notebook resilient to Trace bundle corruption after failed …
Feb 20, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
843 changes: 843 additions & 0 deletions docs/T6_technical_plan.md

Large diffs are not rendered by default.

663 changes: 663 additions & 0 deletions examples/multi_objective_convex_fn.py

Large diffs are not rendered by default.

935 changes: 935 additions & 0 deletions examples/notebooks/t6_m0_analysis.ipynb

Large diffs are not rendered by default.

1,127 changes: 1,127 additions & 0 deletions examples/notebooks/t6_m1_vector_scores.ipynb

Large diffs are not rendered by default.

877 changes: 877 additions & 0 deletions examples/notebooks/t6_m2_bbeh.ipynb

Large diffs are not rendered by default.

185 changes: 185 additions & 0 deletions examples/notebooks/t6_m2_trainers.ipynb

Large diffs are not rendered by default.

144 changes: 139 additions & 5 deletions opto/features/priority_search/priority_search.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,10 @@
from opto.trainer.algorithms.basic_algorithms import batchify
from opto.features.priority_search.search_template import SearchTemplate, Samples, BatchRollout, save_train_config
from opto.features.priority_search.utils import set_module_parameters, remap_update_dict, create_module_from_update_dict, is_module_copy, deepcopy_module
from opto.trainer.objectives import (
ObjectiveConfig, to_score_dict, apply_minimize,
weighted_scalarize, pareto_rank, aggregate_score_dicts
)


class ModuleCandidate:
Expand Down Expand Up @@ -85,6 +89,19 @@ def mean_score(self):
return None
return safe_mean([r['score'] for r in self.rollouts])

def mean_score_dict(self) -> Optional[Dict[str, float]]:
"""Compute the per-metric mean score dict across rollouts.

Returns None if no rollouts have 'score_dict' entries.
"""
if not self.rollouts:
return None
score_dicts = [r.get("score_dict") for r in self.rollouts]
score_dicts = [sd for sd in score_dicts if isinstance(sd, dict)]
if not score_dicts:
return None
return aggregate_score_dicts(score_dicts)

def compute_score_confidence(self, min_score, max_score, scaling_constant=1.0, total_trials=1):
"""Compute the UCB, mean, LCB score for the candidate. After queried, the number of confidence queries is incremented.

Expand Down Expand Up @@ -213,6 +230,76 @@ def _criterion(x):
return p if p is not None else 0
return max(self.memory, key=lambda x: _criterion(x))


class ParetoHeapMemory(HeapMemory):
"""Heap-backed memory that can pop using Pareto-front selection among top-K items.

Keeps scalar priority for push/best efficiency. When mode='pareto',
pop() selects from the Pareto front among the top-K candidates by scalar
priority, with tie-breaking via weighted scalarization.

push() and best() are inherited unchanged from HeapMemory.
"""

def __init__(self, size=None, processing_fun: Callable = None, *,
pareto_k: int = 20,
score_dict_fn: Optional[Callable] = None,
objective_config: Optional[ObjectiveConfig] = None):
super().__init__(size=size, processing_fun=processing_fun)
self.pareto_k = pareto_k
self.score_dict_fn = score_dict_fn
self.objective_config = objective_config

def pop(self):
"""Pop a candidate, using Pareto-front selection when configured.

If objective_config is None, mode != 'pareto', or score_dict_fn is
missing, falls back to standard heapq.heappop (scalar priority).
"""
if not self.memory:
raise IndexError("pop from an empty heap memory")

cfg = self.objective_config
if cfg is None or cfg.mode != "pareto" or self.score_dict_fn is None:
return heapq.heappop(self.memory)

# Extract top-K by scalar priority (heap stores -score, so nsmallest = highest scores)
k = min(self.pareto_k, len(self.memory))
topk = heapq.nsmallest(k, self.memory)
candidates = [c for _, c in topk]

# Get score dicts for each candidate
score_dicts = []
for c in candidates:
sd = self.score_dict_fn(c)
sd = to_score_dict(sd) if sd is not None else None
score_dicts.append(sd)

# Fallback to standard pop if any candidate lacks a score dict
if any(sd is None for sd in score_dicts):
return heapq.heappop(self.memory)

# Apply minimize normalization and compute Pareto ranks
score_dicts = [apply_minimize(sd, cfg.minimize) for sd in score_dicts]
ranks = pareto_rank(score_dicts, metrics=cfg.pareto_metrics)
front_idx = [i for i, r in enumerate(ranks) if r == 0]

# Tie-break among front by weighted scalarization
def _tie_break(i):
sd = score_dicts[i]
if cfg.weights:
return float(weighted_scalarize(sd, cfg.weights, cfg.missing_value))
return float(np.mean(list(sd.values())))

chosen_local = max(front_idx, key=_tie_break)
chosen_item = topk[chosen_local]

# Remove chosen item from heap (O(n), acceptable for small K)
self.memory.remove(chosen_item)
heapq.heapify(self.memory)
return chosen_item


# TODO check saving and loading
class PrioritySearch(SearchTemplate):
""" A search algorithm that uses a priority queue to explore the parameter space and propose new candidates.
Expand Down Expand Up @@ -241,6 +328,8 @@ def train(self,
guide, # guide to provide feedback
train_dataset, # dataset of (x, info) pairs to train the agent
*,
# multi-objective
objective_config: Optional[ObjectiveConfig] = None, # ObjectiveConfig for multi-objective selection (None = scalar)
# validation
validate_dataset = None, # same format as train_dataset; if None, use the current batch.
validate_guide = None, # to provide scores for the validation set
Expand Down Expand Up @@ -318,6 +407,7 @@ def train(self,
short_term_memory_size=short_term_memory_size,
memory_update_frequency=memory_update_frequency,
decouple_optimizers=decouple_optimizers,
objective_config=objective_config,
)

self._enforce_using_data_collecting_candidates = True
Expand Down Expand Up @@ -354,7 +444,8 @@ def _initialize_search_parameters(self, *,
long_term_memory_size,
short_term_memory_size,
memory_update_frequency,
decouple_optimizers):
decouple_optimizers,
objective_config=None):
"""Initialize search parameters and memory structures.

Args:
Expand All @@ -369,6 +460,7 @@ def _initialize_search_parameters(self, *,
short_term_memory_size (int): Size of the short-term memory
memory_update_frequency (int): The candidates are merged into long-term memory after this many iterations.
decouple_optimizers (bool): Whether to decouple optimizers for each candidate
objective_config (ObjectiveConfig, optional): Multi-objective selection config. None = scalar.
"""
# Validate and adjust num_candidates based on number of optimizers
if num_candidates < len(self._optimizers):
Expand Down Expand Up @@ -410,8 +502,26 @@ def _initialize_search_parameters(self, *,
else:
print(f"PrioritySearch initialized with both short-term and long-term memory. Candidates will be merged into long-term memory every {memory_update_frequency} iterations.")

self.long_term_memory = HeapMemory(size=long_term_memory_size, processing_fun=self.compress_candidate_memory)
self.short_term_memory = HeapMemory(size=short_term_memory_size)
self.objective_config = objective_config
use_pareto_memory = (objective_config is not None
and objective_config.mode == "pareto")
if use_pareto_memory:
self.long_term_memory = ParetoHeapMemory(
size=long_term_memory_size,
processing_fun=self.compress_candidate_memory,
pareto_k=20,
score_dict_fn=lambda c: c.mean_score_dict(),
objective_config=objective_config,
)
self.short_term_memory = ParetoHeapMemory(
size=short_term_memory_size,
pareto_k=20,
score_dict_fn=lambda c: c.mean_score_dict(),
objective_config=objective_config,
)
else:
self.long_term_memory = HeapMemory(size=long_term_memory_size, processing_fun=self.compress_candidate_memory)
self.short_term_memory = HeapMemory(size=short_term_memory_size)
self.memory_update_frequency = memory_update_frequency

def update(self,
Expand Down Expand Up @@ -635,6 +745,20 @@ def validate(self,
for c, rollouts in matched_candidates_and_samples.items(): # rollouts is a list of BatchRollouts
results[c] = [ r for rr in rollouts for r in rr.to_list()] # we only need the list of dicts

# Populate score_dict in each rollout when multi-objective is active
cfg = getattr(self, 'objective_config', None)
if cfg is not None and cfg.mode != "scalar":
guide = self.validate_sampler.guide
for c, rollout_list in results.items():
for rollout in rollout_list:
try:
sd = guide.get_score_dict(
rollout['x'], rollout['target'], rollout['info']
)
rollout['score_dict'] = sd
except Exception:
pass # guide may not support get_score_dict; skip gracefully

return results

def match_candidates_and_samples(
Expand Down Expand Up @@ -769,8 +893,18 @@ def compute_exploration_priority(self, candidate) -> float:
"""
if not isinstance(candidate, ModuleCandidate):
raise TypeError("candidate must be an instance of ModuleCandidate.")
# By default, we compute the mean score of the rollouts

# Multi-objective priority: use weighted scalarization of score_dict when available
if getattr(self, 'objective_config', None) is not None:
sd = candidate.mean_score_dict()
if sd is not None:
cfg = self.objective_config
sd = apply_minimize(to_score_dict(sd), cfg.minimize)
if cfg.weights:
return float(weighted_scalarize(sd, cfg.weights, cfg.missing_value))
return float(np.mean(list(sd.values())))

# Fall through to existing scalar logic
if self.score_function == 'mean':
# Compute the mean score of the candidate's rollouts
return candidate.mean_score()
Expand All @@ -795,7 +929,7 @@ def compress_candidate_memory(self, candidate: ModuleCandidate) -> ModuleCandida
def _process_rollout(rollout):
# rollout is a dict containing module, x, info, target, score, feedback
for k in rollout:
if k not in ['score']:
if k not in ['score', 'score_dict']:
rollout[k] = None
candidate = copy.copy(candidate) # make a copy of the candidate to avoid modifying the original one
candidate.rollouts = copy.deepcopy(candidate.rollouts) # deep copy the rollouts to avoid modifying the original one
Expand Down
108 changes: 89 additions & 19 deletions opto/trainer/algorithms/basic_algorithms.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,18 @@
from opto.trainer.loader import DataLoader
from opto.trainer.utils import batch_run, async_run
from opto.optimizers.utils import print_color
from opto.trainer.evaluators import evaluate
from opto.trainer.evaluators import evaluate, evaluate_vector, aggregate_vector_scores
from opto.trainer.objectives import ObjectiveConfig, select_best, apply_minimize, weighted_scalarize


def _objective_scalar(score_dict, config):
"""Compute scalar objective consistent with selection mode.

Uses weighted_scalarize(apply_minimize(...)) so the logged scalar
reflects the same weights and minimize settings used for selection.
"""
minimized = apply_minimize(score_dict, config.minimize)
return weighted_scalarize(minimized, config.weights, config.missing_value)


def standard_optimization_step(agent, x, guide, info, min_score=0):
Expand Down Expand Up @@ -533,6 +544,7 @@ def train(self,
validate_dataset = None, # dataset of (x, info) pairs to evaluate the agent for candidate selection
validate_guide = None, # to provide scores for the validation set
num_proposals = 4, # number of proposals to get from the optimizer
objective_config = None, # optional ObjectiveConfig for multi-objective selection
num_epochs = 1, # number of training epochs
batch_size = 1, # batch size for updating the agent
test_dataset = None, # dataset of (x, info) pairs to evaluate the agent
Expand All @@ -549,6 +561,8 @@ def train(self,
self.validate_guide = validate_guide or guide
self.min_score = min_score
self.current_score = None
self.objective_config = objective_config
self.current_score_dict = None # stores vector score when using multi-objective

return super().train(guide, train_dataset, num_epochs=num_epochs, batch_size=batch_size,
test_dataset=test_dataset, test_frequency=test_frequency, log_frequency=log_frequency,
Expand All @@ -571,6 +585,21 @@ def validate():
description="Validating proposals")
return np.mean(scores) if all([s is not None for s in scores]) else -np.inf

def validate_vector():
""" Validate and return aggregated vector score dict. """
score_dicts = evaluate_vector(self.agent,
self.validate_guide,
self.validate_dataset['inputs'],
self.validate_dataset['infos'],
min_score=self.min_score,
num_threads=num_threads,
description="Validating proposals (vector)")
return aggregate_vector_scores(score_dicts)

# Determine whether to use vector scoring for selection
use_vector = (self.objective_config is not None
and self.objective_config.mode != "scalar")

# TODO perhaps we can ask for multiple updates in one query or use different temperatures in different queries
# Generate different proposals
step_kwargs = dict(bypassing=True, verbose='output' if verbose else False) # we don't print the inner full message
Expand All @@ -582,25 +611,57 @@ def validate():
kwargs_list=[step_kwargs] * self.num_proposals,
max_workers=num_threads,
description=f"Generating {self.num_proposals} proposals") # async step

# Validate the proposals
candidates = []
backup_dict = {p: copy.deepcopy(p.data) for p in self.agent.parameters()} # backup the current value
for update_dict in update_dicts:
if len(update_dict) == 0:
continue
self.optimizer.update(update_dict) # set the agent with update_dict
score = validate() # check the score on the validation set
candidates.append((score, update_dict))
self.optimizer.update(backup_dict) # restore the backup

# Include the current parameter as a candidate
if self.current_score is None:
self.current_score = validate()
candidates.append((self.current_score, backup_dict))

# Find the candidate with the best score
best_score, best_update = max(candidates, key=lambda x: x[0])
self.current_score = best_score

if use_vector:
# Vector path: collect (score_dict, update_dict) for multi-objective selection
vector_candidates = []
for update_dict in update_dicts:
if len(update_dict) == 0:
continue
self.optimizer.update(update_dict)
score_dict = validate_vector()
scalar_score = _objective_scalar(score_dict, self.objective_config)
candidates.append((scalar_score, update_dict))
vector_candidates.append((score_dict, update_dict))
self.optimizer.update(backup_dict)

# Include current parameters as a candidate
if self.current_score_dict is None:
self.current_score_dict = validate_vector()
if self.current_score is None:
self.current_score = _objective_scalar(self.current_score_dict, self.objective_config)
candidates.append((self.current_score, backup_dict))
vector_candidates.append((self.current_score_dict, backup_dict))

# Select best via multi-objective config
best_idx = select_best(vector_candidates, self.objective_config)
best_score_dict = vector_candidates[best_idx][0]
best_update = vector_candidates[best_idx][1]
best_score = _objective_scalar(best_score_dict, self.objective_config)
self.current_score = best_score
self.current_score_dict = best_score_dict
else:
# Scalar path: unchanged from original behavior
for update_dict in update_dicts:
if len(update_dict) == 0:
continue
self.optimizer.update(update_dict) # set the agent with update_dict
score = validate() # check the score on the validation set
candidates.append((score, update_dict))
self.optimizer.update(backup_dict) # restore the backup

# Include the current parameter as a candidate
if self.current_score is None:
self.current_score = validate()
candidates.append((self.current_score, backup_dict))

# Find the candidate with the best score
best_score, best_update = max(candidates, key=lambda x: x[0])
self.current_score = best_score

if verbose:
print_color(f"Best score: {best_score} out of scores {[c[0] for c in candidates]}", 'green')
Expand All @@ -609,5 +670,14 @@ def validate():
# Make the best update
self.optimizer.update(best_update)

# Logging
self.logger.log('Validation score', best_score, self.n_iters, color='green')
# Logging — scalar objective for backward compatibility
if use_vector:
self.logger.log('Validation objective', best_score, self.n_iters, color='green')
else:
self.logger.log('Validation score', best_score, self.n_iters, color='green')

# Log individual vector metrics if available
if use_vector and isinstance(best_score_dict, dict):
for metric_name, metric_value in best_score_dict.items():
self.logger.log(f'Validation score/{metric_name}', metric_value,
self.n_iters, color='green')
Loading