Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 9 additions & 12 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ public class WeatherAgentCardProducer {
import io.a2a.server.agentexecution.AgentExecutor;
import io.a2a.server.agentexecution.RequestContext;
import io.a2a.server.events.EventQueue;
import io.a2a.server.tasks.TaskUpdater;
import io.a2a.server.tasks.AgentEmitter;
import io.a2a.spec.JSONRPCError;
import io.a2a.spec.Message;
import io.a2a.spec.Part;
Expand Down Expand Up @@ -174,14 +174,12 @@ public class WeatherAgentExecutorProducer {
}

@Override
public void execute(RequestContext context, EventQueue eventQueue) throws JSONRPCError {
TaskUpdater updater = new TaskUpdater(context, eventQueue);

public void execute(RequestContext context, AgentEmitter agentEmitter) throws JSONRPCError {
// mark the task as submitted and start working on it
if (context.getTask() == null) {
updater.submit();
agentEmitter.submit();
}
updater.startWork();
agentEmitter.startWork();

// extract the text from the message
String userMessage = extractTextFromMessage(context.getMessage());
Expand All @@ -190,16 +188,16 @@ public class WeatherAgentExecutorProducer {
String response = weatherAgent.chat(userMessage);

// create the response part
TextPart responsePart = new TextPart(response, null);
TextPart responsePart = new TextPart(response);
List<Part<?>> parts = List.of(responsePart);

// add the response as an artifact and complete the task
updater.addArtifact(parts, null, null, null);
updater.complete();
agentEmitter.addArtifact(parts);
agentEmitter.complete();
}

@Override
public void cancel(RequestContext context, EventQueue eventQueue) throws JSONRPCError {
public void cancel(RequestContext context, AgentEmitter agentEmitter) throws JSONRPCError {
Task task = context.getTask();

if (task.getStatus().state() == TaskState.CANCELED) {
Expand All @@ -213,8 +211,7 @@ public class WeatherAgentExecutorProducer {
}

// cancel the task
TaskUpdater updater = new TaskUpdater(context, eventQueue);
updater.cancel();
agentEmitter.cancel();
}

private String extractTextFromMessage(Message message) {
Expand Down
21 changes: 10 additions & 11 deletions examples/cloud-deployment/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -224,8 +224,7 @@ The agent (`CloudAgentExecutorProducer`) implements a command-based protocol:

```java
@Override
public void execute(RequestContext context, EventQueue eventQueue) throws JSONRPCError {
TaskUpdater updater = new TaskUpdater(context, eventQueue);
public void execute(RequestContext context, AgentEmitter agentEmitter) throws JSONRPCError {
String messageText = extractTextFromMessage(context.getMessage()).trim().toLowerCase();

// Get pod name from Kubernetes downward API
Expand All @@ -234,21 +233,21 @@ public void execute(RequestContext context, EventQueue eventQueue) throws JSONRP
if ("complete".equals(messageText)) {
// Completion trigger - add final artifact and complete
String artifactText = "Completed by " + podName;
List<Part<?>> parts = List.of(new TextPart(artifactText, null));
updater.addArtifact(parts);
updater.complete(); // Transition to COMPLETED state
List<Part<?>> parts = List.of(new TextPart(artifactText));
agentEmitter.addArtifact(parts);
agentEmitter.complete(); // Transition to COMPLETED state
} else if (context.getTask() == null) {
// Initial "start" message - create task in SUBMITTED → WORKING state
updater.submit();
updater.startWork();
agentEmitter.submit();
agentEmitter.startWork();
String artifactText = "Started by " + podName;
List<Part<?>> parts = List.of(new TextPart(artifactText, null));
updater.addArtifact(parts);
List<Part<?>> parts = List.of(new TextPart(artifactText));
agentEmitter.addArtifact(parts);
} else {
// Subsequent "process" messages - add artifacts (fire-and-forget, stays WORKING)
String artifactText = "Processed by " + podName;
List<Part<?>> parts = List.of(new TextPart(artifactText, null));
updater.addArtifact(parts);
List<Part<?>> parts = List.of(new TextPart(artifactText));
agentEmitter.addArtifact(parts);
}
}
```
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,7 @@

import io.a2a.server.agentexecution.AgentExecutor;
import io.a2a.server.agentexecution.RequestContext;
import io.a2a.server.events.EventQueue;
import io.a2a.server.tasks.TaskUpdater;
import io.a2a.server.tasks.AgentEmitter;
import io.a2a.spec.A2AError;
import io.a2a.spec.InternalError;
import io.a2a.spec.Message;
Expand Down Expand Up @@ -46,8 +45,7 @@ public AgentExecutor agentExecutor() {
private static class CloudAgentExecutor implements AgentExecutor {

@Override
public void execute(RequestContext context, EventQueue eventQueue) throws A2AError {
TaskUpdater updater = new TaskUpdater(context, eventQueue);
public void execute(RequestContext context, AgentEmitter agentEmitter) throws A2AError {

try {
// Extract user message and normalize
Expand Down Expand Up @@ -75,26 +73,26 @@ public void execute(RequestContext context, EventQueue eventQueue) throws A2AErr
LOGGER.info("Completion requested on pod: {}", podName);
String artifactText = "Completed by " + podName;
List<Part<?>> parts = List.of(new TextPart(artifactText));
updater.addArtifact(parts);
updater.complete();
agentEmitter.addArtifact(parts);
agentEmitter.complete();
LOGGER.info("Task completed on pod: {}", podName);

} else if (context.getTask() == null) {
// Initial message - create task in SUBMITTED → WORKING state
LOGGER.info("Creating new task on pod: {}", podName);
updater.submit();
updater.startWork();
agentEmitter.submit();
agentEmitter.startWork();
String artifactText = "Started by " + podName;
List<Part<?>> parts = List.of(new TextPart(artifactText));
updater.addArtifact(parts);
agentEmitter.addArtifact(parts);
LOGGER.info("Task created and started on pod: {}", podName);

} else {
// Subsequent messages - add artifacts (fire-and-forget, stays in WORKING)
LOGGER.info("Adding artifact on pod: {}", podName);
String artifactText = "Processed by " + podName;
List<Part<?>> parts = List.of(new TextPart(artifactText));
updater.addArtifact(parts);
agentEmitter.addArtifact(parts);
// No state change - task remains in WORKING
LOGGER.info("Artifact added on pod: {}", podName);
}
Expand All @@ -109,10 +107,9 @@ public void execute(RequestContext context, EventQueue eventQueue) throws A2AErr
}

@Override
public void cancel(RequestContext context, EventQueue eventQueue) throws A2AError {
public void cancel(RequestContext context, AgentEmitter agentEmitter) throws A2AError {
LOGGER.info("Task cancellation requested");
TaskUpdater updater = new TaskUpdater(context, eventQueue);
updater.cancel();
agentEmitter.cancel();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,9 @@
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.inject.Produces;

import io.a2a.A2A;
import io.a2a.server.agentexecution.AgentExecutor;
import io.a2a.server.agentexecution.RequestContext;
import io.a2a.server.events.EventQueue;
import io.a2a.server.tasks.AgentEmitter;
import io.a2a.spec.A2AError;
import io.a2a.spec.UnsupportedOperationError;

Expand All @@ -17,12 +16,12 @@ public class AgentExecutorProducer {
public AgentExecutor agentExecutor() {
return new AgentExecutor() {
@Override
public void execute(RequestContext context, EventQueue eventQueue) throws A2AError {
eventQueue.enqueueEvent(A2A.toAgentMessage("Hello World"));
public void execute(RequestContext context, AgentEmitter emitter) throws A2AError {
emitter.sendMessage("Hello World");
}

@Override
public void cancel(RequestContext context, EventQueue eventQueue) throws A2AError {
public void cancel(RequestContext context, AgentEmitter emitter) throws A2AError {
throw new UnsupportedOperationError();
}
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,8 @@

import io.a2a.server.agentexecution.AgentExecutor;
import io.a2a.server.agentexecution.RequestContext;
import io.a2a.server.events.EventQueue;
import io.a2a.server.tasks.AgentEmitter;
import io.a2a.server.tasks.PushNotificationSender;
import io.a2a.server.tasks.TaskUpdater;
import io.a2a.spec.A2AError;
import io.a2a.spec.InvalidRequestError;
import io.a2a.spec.Message;
Expand All @@ -33,31 +32,29 @@ public class JpaDatabasePushNotificationConfigStoreTestAgentExecutor {
public AgentExecutor agentExecutor() {
return new AgentExecutor() {
@Override
public void execute(RequestContext context, EventQueue eventQueue) throws A2AError {
TaskUpdater taskUpdater = new TaskUpdater(context, eventQueue);
public void execute(RequestContext context, AgentEmitter agentEmitter) throws A2AError {
String command = getLastTextPart(context.getMessage());

// Switch based on the command from the test client
switch (command) {
case "create":
taskUpdater.submit();
agentEmitter.submit();
break;
case "update":
// Perform a meaningful update, like adding an artifact.
// This state change is what will trigger the notification.
taskUpdater.addArtifact(List.of(new TextPart("updated-artifact")), "art-1", "test", null);
agentEmitter.addArtifact(List.of(new TextPart("updated-artifact")), "art-1", "test", null);
break;
default:
// On the first message (which might have no text), just submit.
taskUpdater.submit();
agentEmitter.submit();
break;
}
}

@Override
public void cancel(RequestContext context, EventQueue eventQueue) throws A2AError {
TaskUpdater taskUpdater = new TaskUpdater(context, eventQueue);
taskUpdater.cancel();
public void cancel(RequestContext context, AgentEmitter agentEmitter) throws A2AError {
agentEmitter.cancel();
}
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ void testReplicationStrategyNotTriggeredOnReplicatedEvent() throws InterruptedEx
String taskId = "test-task-2";
EventQueue queue = queueManager.createOrTap(taskId);

ReplicatedEventQueueItem replicatedEvent = new ReplicatedEventQueueItem(taskId, testEvent);
ReplicatedEventQueueItem replicatedEvent = new ReplicatedEventQueueItem(taskId, getTaskStatusUpdateEventWithNewId(taskId));
queueManager.onReplicatedEvent(replicatedEvent);

assertEquals(0, strategy.getCallCount());
Expand All @@ -174,7 +174,7 @@ void testReplicationStrategyWithCountingImplementation() throws InterruptedExcep
assertEquals(taskId, countingStrategy.getLastTaskId());
assertEquals(event, countingStrategy.getLastEvent());

ReplicatedEventQueueItem replicatedEvent = new ReplicatedEventQueueItem(taskId, testEvent);
ReplicatedEventQueueItem replicatedEvent = new ReplicatedEventQueueItem(taskId, getTaskStatusUpdateEventWithNewId(taskId));
queueManager.onReplicatedEvent(replicatedEvent);

assertEquals(2, countingStrategy.getCallCount());
Expand Down Expand Up @@ -488,7 +488,7 @@ void testReplicatedEventToExistingQueueWhenTaskBecomesInactive() throws Interrup

// Create queue and enqueue an event
EventQueue queue = queueManager.createOrTap(taskId);
queue.enqueueEvent(testEvent);
queue.enqueueEvent(getTaskStatusUpdateEventWithNewId(taskId));

// Dequeue to clear the queue
try {
Expand Down Expand Up @@ -613,6 +613,11 @@ void testReplicatedQueueClosedEventTerminatesConsumer() throws InterruptedExcept
"Second event should be QueueClosedEvent");
}

private TaskStatusUpdateEvent getTaskStatusUpdateEventWithNewId(String taskId) {
return TaskStatusUpdateEvent.builder((TaskStatusUpdateEvent) testEvent).taskId(taskId).build();
}


private static class NoOpReplicationStrategy implements ReplicationStrategy {
@Override
public void send(String taskId, Event event) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,7 @@

import io.a2a.server.agentexecution.AgentExecutor;
import io.a2a.server.agentexecution.RequestContext;
import io.a2a.server.events.EventQueue;
import io.a2a.server.tasks.TaskUpdater;
import io.a2a.server.tasks.AgentEmitter;
import io.a2a.spec.A2AError;
import io.a2a.spec.Task;
import io.a2a.spec.TextPart;
Expand All @@ -18,9 +17,8 @@
*/
public class MultiInstanceReplicationAgentExecutor implements AgentExecutor {
@Override
public void execute(RequestContext context, EventQueue eventQueue) throws A2AError {
public void execute(RequestContext context, AgentEmitter agentEmitter) throws A2AError {
Task task = context.getTask();
TaskUpdater updater = new TaskUpdater(context, eventQueue);

// Check if message contains "close" signal
boolean shouldClose = context.getMessage().parts().stream()
Expand All @@ -30,19 +28,18 @@ public void execute(RequestContext context, EventQueue eventQueue) throws A2AErr

if (shouldClose) {
// Close the task
updater.complete();
agentEmitter.complete();
} else if (task == null) {
// First message - create task in SUBMITTED state
updater.submit();
agentEmitter.submit();
} else {
// Subsequent messages - add as artifact
updater.addArtifact(context.getMessage().parts());
agentEmitter.addArtifact(context.getMessage().parts());
}
}

@Override
public void cancel(RequestContext context, EventQueue eventQueue) throws A2AError {
TaskUpdater updater = new TaskUpdater(context, eventQueue);
updater.cancel();
public void cancel(RequestContext context, AgentEmitter agentEmitter) throws A2AError {
agentEmitter.cancel();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,7 @@

import io.a2a.server.agentexecution.AgentExecutor;
import io.a2a.server.agentexecution.RequestContext;
import io.a2a.server.events.EventQueue;
import io.a2a.server.tasks.TaskUpdater;
import io.a2a.server.tasks.AgentEmitter;
import io.a2a.spec.A2AError;
import io.a2a.spec.InvalidRequestError;
import io.a2a.spec.Message;
Expand All @@ -28,37 +27,34 @@ public class ReplicationTestAgentExecutor {
public AgentExecutor agentExecutor() {
return new AgentExecutor() {
@Override
public void execute(RequestContext context, EventQueue eventQueue) throws A2AError {

TaskUpdater taskUpdater = new TaskUpdater(context, eventQueue);
public void execute(RequestContext context, AgentEmitter agentEmitter) throws A2AError {
String lastText = getLastTextPart(context.getMessage());

switch (lastText) {
case "create":
// Submit task - this should trigger TaskStatusUpdateEvent
taskUpdater.submit();
agentEmitter.submit();
break;
case "working":
// Move task to WORKING state without completing - keeps queue alive
taskUpdater.submit();
taskUpdater.startWork();
agentEmitter.submit();
agentEmitter.startWork();
break;
case "complete":
// Complete the task - should trigger poison pill generation
taskUpdater.submit();
taskUpdater.startWork();
taskUpdater.addArtifact(List.of(new TextPart("Task completed")));
taskUpdater.complete();
agentEmitter.submit();
agentEmitter.startWork();
agentEmitter.addArtifact(List.of(new TextPart("Task completed")));
agentEmitter.complete();
break;
default:
throw new InvalidRequestError("Unknown command: " + lastText);
}
}

@Override
public void cancel(RequestContext context, EventQueue eventQueue) throws A2AError {
TaskUpdater taskUpdater = new TaskUpdater(context, eventQueue);
taskUpdater.cancel();
public void cancel(RequestContext context, AgentEmitter agentEmitter) throws A2AError {
agentEmitter.cancel();
}
};
}
Expand Down
Loading
Loading