From 8bb5d9306e8bfb1c759902a81baa5036c326c2d6 Mon Sep 17 00:00:00 2001 From: Kabir Khan Date: Tue, 3 Feb 2026 17:24:13 +0000 Subject: [PATCH] feat!: Use AgentEmitter instead of EventQueue in AgentExecutor methods BREAKING CHANGE: AgentEmitter contains the methods from the old TaskUpdater and is now the only way agents send results back to the caller. This hides the EventQueue mechanism from users. Also introduced a check that when placing a full Task object on the queue, which should only be done for calls with no existing Task, that the Task's ID is the one expected for the queue, as calculated by the RequestContext. Added Message and Task builders to AgentEmitter to help with using the proper taskID and contextId. --- README.md | 21 +- examples/cloud-deployment/README.md | 21 +- .../cloud/CloudAgentExecutorProducer.java | 23 +- .../helloworld/AgentExecutorProducer.java | 9 +- ...ificationConfigStoreTestAgentExecutor.java | 17 +- .../core/ReplicatedQueueManagerTest.java | 11 +- ...MultiInstanceReplicationAgentExecutor.java | 17 +- .../tests/ReplicationTestAgentExecutor.java | 26 +- ...JpaDatabaseTaskStoreTestAgentExecutor.java | 18 +- .../server/agentexecution/AgentExecutor.java | 45 +- .../server/agentexecution/RequestContext.java | 2 +- .../java/io/a2a/server/events/EventQueue.java | 45 ++ .../DefaultRequestHandler.java | 26 +- .../io/a2a/server/tasks/AgentEmitter.java | 617 ++++++++++++++++++ .../java/io/a2a/server/tasks/TaskUpdater.java | 191 ------ .../AbstractA2ARequestHandlerTest.java | 12 +- ...UpdaterTest.java => AgentEmitterTest.java} | 90 +-- .../a2a/tck/server/AgentExecutorProducer.java | 24 +- .../apps/common/AgentExecutorProducer.java | 33 +- .../grpc/handler/GrpcHandlerTest.java | 84 ++- .../jsonrpc/handler/JSONRPCHandlerTest.java | 104 ++- .../rest/handler/RestHandlerTest.java | 31 +- 22 files changed, 953 insertions(+), 514 deletions(-) create mode 100644 server-common/src/main/java/io/a2a/server/tasks/AgentEmitter.java delete mode 100644 server-common/src/main/java/io/a2a/server/tasks/TaskUpdater.java rename server-common/src/test/java/io/a2a/server/tasks/{TaskUpdaterTest.java => AgentEmitterTest.java} (87%) diff --git a/README.md b/README.md index 50a760b10..c474eca31 100644 --- a/README.md +++ b/README.md @@ -144,7 +144,7 @@ public class WeatherAgentCardProducer { import io.a2a.server.agentexecution.AgentExecutor; import io.a2a.server.agentexecution.RequestContext; import io.a2a.server.events.EventQueue; -import io.a2a.server.tasks.TaskUpdater; +import io.a2a.server.tasks.AgentEmitter; import io.a2a.spec.JSONRPCError; import io.a2a.spec.Message; import io.a2a.spec.Part; @@ -174,14 +174,12 @@ public class WeatherAgentExecutorProducer { } @Override - public void execute(RequestContext context, EventQueue eventQueue) throws JSONRPCError { - TaskUpdater updater = new TaskUpdater(context, eventQueue); - + public void execute(RequestContext context, AgentEmitter agentEmitter) throws JSONRPCError { // mark the task as submitted and start working on it if (context.getTask() == null) { - updater.submit(); + agentEmitter.submit(); } - updater.startWork(); + agentEmitter.startWork(); // extract the text from the message String userMessage = extractTextFromMessage(context.getMessage()); @@ -190,16 +188,16 @@ public class WeatherAgentExecutorProducer { String response = weatherAgent.chat(userMessage); // create the response part - TextPart responsePart = new TextPart(response, null); + TextPart responsePart = new TextPart(response); List> parts = List.of(responsePart); // add the response as an artifact and complete the task - updater.addArtifact(parts, null, null, null); - updater.complete(); + agentEmitter.addArtifact(parts); + agentEmitter.complete(); } @Override - public void cancel(RequestContext context, EventQueue eventQueue) throws JSONRPCError { + public void cancel(RequestContext context, AgentEmitter agentEmitter) throws JSONRPCError { Task task = context.getTask(); if (task.getStatus().state() == TaskState.CANCELED) { @@ -213,8 +211,7 @@ public class WeatherAgentExecutorProducer { } // cancel the task - TaskUpdater updater = new TaskUpdater(context, eventQueue); - updater.cancel(); + agentEmitter.cancel(); } private String extractTextFromMessage(Message message) { diff --git a/examples/cloud-deployment/README.md b/examples/cloud-deployment/README.md index bf1e4cd60..fe146a6e9 100644 --- a/examples/cloud-deployment/README.md +++ b/examples/cloud-deployment/README.md @@ -224,8 +224,7 @@ The agent (`CloudAgentExecutorProducer`) implements a command-based protocol: ```java @Override -public void execute(RequestContext context, EventQueue eventQueue) throws JSONRPCError { - TaskUpdater updater = new TaskUpdater(context, eventQueue); +public void execute(RequestContext context, AgentEmitter agentEmitter) throws JSONRPCError { String messageText = extractTextFromMessage(context.getMessage()).trim().toLowerCase(); // Get pod name from Kubernetes downward API @@ -234,21 +233,21 @@ public void execute(RequestContext context, EventQueue eventQueue) throws JSONRP if ("complete".equals(messageText)) { // Completion trigger - add final artifact and complete String artifactText = "Completed by " + podName; - List> parts = List.of(new TextPart(artifactText, null)); - updater.addArtifact(parts); - updater.complete(); // Transition to COMPLETED state + List> parts = List.of(new TextPart(artifactText)); + agentEmitter.addArtifact(parts); + agentEmitter.complete(); // Transition to COMPLETED state } else if (context.getTask() == null) { // Initial "start" message - create task in SUBMITTED → WORKING state - updater.submit(); - updater.startWork(); + agentEmitter.submit(); + agentEmitter.startWork(); String artifactText = "Started by " + podName; - List> parts = List.of(new TextPart(artifactText, null)); - updater.addArtifact(parts); + List> parts = List.of(new TextPart(artifactText)); + agentEmitter.addArtifact(parts); } else { // Subsequent "process" messages - add artifacts (fire-and-forget, stays WORKING) String artifactText = "Processed by " + podName; - List> parts = List.of(new TextPart(artifactText, null)); - updater.addArtifact(parts); + List> parts = List.of(new TextPart(artifactText)); + agentEmitter.addArtifact(parts); } } ``` diff --git a/examples/cloud-deployment/server/src/main/java/io/a2a/examples/cloud/CloudAgentExecutorProducer.java b/examples/cloud-deployment/server/src/main/java/io/a2a/examples/cloud/CloudAgentExecutorProducer.java index d3d36044d..976826fb3 100644 --- a/examples/cloud-deployment/server/src/main/java/io/a2a/examples/cloud/CloudAgentExecutorProducer.java +++ b/examples/cloud-deployment/server/src/main/java/io/a2a/examples/cloud/CloudAgentExecutorProducer.java @@ -7,8 +7,7 @@ import io.a2a.server.agentexecution.AgentExecutor; import io.a2a.server.agentexecution.RequestContext; -import io.a2a.server.events.EventQueue; -import io.a2a.server.tasks.TaskUpdater; +import io.a2a.server.tasks.AgentEmitter; import io.a2a.spec.A2AError; import io.a2a.spec.InternalError; import io.a2a.spec.Message; @@ -46,8 +45,7 @@ public AgentExecutor agentExecutor() { private static class CloudAgentExecutor implements AgentExecutor { @Override - public void execute(RequestContext context, EventQueue eventQueue) throws A2AError { - TaskUpdater updater = new TaskUpdater(context, eventQueue); + public void execute(RequestContext context, AgentEmitter agentEmitter) throws A2AError { try { // Extract user message and normalize @@ -75,18 +73,18 @@ public void execute(RequestContext context, EventQueue eventQueue) throws A2AErr LOGGER.info("Completion requested on pod: {}", podName); String artifactText = "Completed by " + podName; List> parts = List.of(new TextPart(artifactText)); - updater.addArtifact(parts); - updater.complete(); + agentEmitter.addArtifact(parts); + agentEmitter.complete(); LOGGER.info("Task completed on pod: {}", podName); } else if (context.getTask() == null) { // Initial message - create task in SUBMITTED → WORKING state LOGGER.info("Creating new task on pod: {}", podName); - updater.submit(); - updater.startWork(); + agentEmitter.submit(); + agentEmitter.startWork(); String artifactText = "Started by " + podName; List> parts = List.of(new TextPart(artifactText)); - updater.addArtifact(parts); + agentEmitter.addArtifact(parts); LOGGER.info("Task created and started on pod: {}", podName); } else { @@ -94,7 +92,7 @@ public void execute(RequestContext context, EventQueue eventQueue) throws A2AErr LOGGER.info("Adding artifact on pod: {}", podName); String artifactText = "Processed by " + podName; List> parts = List.of(new TextPart(artifactText)); - updater.addArtifact(parts); + agentEmitter.addArtifact(parts); // No state change - task remains in WORKING LOGGER.info("Artifact added on pod: {}", podName); } @@ -109,10 +107,9 @@ public void execute(RequestContext context, EventQueue eventQueue) throws A2AErr } @Override - public void cancel(RequestContext context, EventQueue eventQueue) throws A2AError { + public void cancel(RequestContext context, AgentEmitter agentEmitter) throws A2AError { LOGGER.info("Task cancellation requested"); - TaskUpdater updater = new TaskUpdater(context, eventQueue); - updater.cancel(); + agentEmitter.cancel(); } /** diff --git a/examples/helloworld/server/src/main/java/io/a2a/examples/helloworld/AgentExecutorProducer.java b/examples/helloworld/server/src/main/java/io/a2a/examples/helloworld/AgentExecutorProducer.java index 4bba596db..3c3be977b 100644 --- a/examples/helloworld/server/src/main/java/io/a2a/examples/helloworld/AgentExecutorProducer.java +++ b/examples/helloworld/server/src/main/java/io/a2a/examples/helloworld/AgentExecutorProducer.java @@ -3,10 +3,9 @@ import jakarta.enterprise.context.ApplicationScoped; import jakarta.enterprise.inject.Produces; -import io.a2a.A2A; import io.a2a.server.agentexecution.AgentExecutor; import io.a2a.server.agentexecution.RequestContext; -import io.a2a.server.events.EventQueue; +import io.a2a.server.tasks.AgentEmitter; import io.a2a.spec.A2AError; import io.a2a.spec.UnsupportedOperationError; @@ -17,12 +16,12 @@ public class AgentExecutorProducer { public AgentExecutor agentExecutor() { return new AgentExecutor() { @Override - public void execute(RequestContext context, EventQueue eventQueue) throws A2AError { - eventQueue.enqueueEvent(A2A.toAgentMessage("Hello World")); + public void execute(RequestContext context, AgentEmitter emitter) throws A2AError { + emitter.sendMessage("Hello World"); } @Override - public void cancel(RequestContext context, EventQueue eventQueue) throws A2AError { + public void cancel(RequestContext context, AgentEmitter emitter) throws A2AError { throw new UnsupportedOperationError(); } }; diff --git a/extras/push-notification-config-store-database-jpa/src/test/java/io/a2a/extras/pushnotificationconfigstore/database/jpa/JpaDatabasePushNotificationConfigStoreTestAgentExecutor.java b/extras/push-notification-config-store-database-jpa/src/test/java/io/a2a/extras/pushnotificationconfigstore/database/jpa/JpaDatabasePushNotificationConfigStoreTestAgentExecutor.java index df29c1688..4061a3619 100644 --- a/extras/push-notification-config-store-database-jpa/src/test/java/io/a2a/extras/pushnotificationconfigstore/database/jpa/JpaDatabasePushNotificationConfigStoreTestAgentExecutor.java +++ b/extras/push-notification-config-store-database-jpa/src/test/java/io/a2a/extras/pushnotificationconfigstore/database/jpa/JpaDatabasePushNotificationConfigStoreTestAgentExecutor.java @@ -8,9 +8,8 @@ import io.a2a.server.agentexecution.AgentExecutor; import io.a2a.server.agentexecution.RequestContext; -import io.a2a.server.events.EventQueue; +import io.a2a.server.tasks.AgentEmitter; import io.a2a.server.tasks.PushNotificationSender; -import io.a2a.server.tasks.TaskUpdater; import io.a2a.spec.A2AError; import io.a2a.spec.InvalidRequestError; import io.a2a.spec.Message; @@ -33,31 +32,29 @@ public class JpaDatabasePushNotificationConfigStoreTestAgentExecutor { public AgentExecutor agentExecutor() { return new AgentExecutor() { @Override - public void execute(RequestContext context, EventQueue eventQueue) throws A2AError { - TaskUpdater taskUpdater = new TaskUpdater(context, eventQueue); + public void execute(RequestContext context, AgentEmitter agentEmitter) throws A2AError { String command = getLastTextPart(context.getMessage()); // Switch based on the command from the test client switch (command) { case "create": - taskUpdater.submit(); + agentEmitter.submit(); break; case "update": // Perform a meaningful update, like adding an artifact. // This state change is what will trigger the notification. - taskUpdater.addArtifact(List.of(new TextPart("updated-artifact")), "art-1", "test", null); + agentEmitter.addArtifact(List.of(new TextPart("updated-artifact")), "art-1", "test", null); break; default: // On the first message (which might have no text), just submit. - taskUpdater.submit(); + agentEmitter.submit(); break; } } @Override - public void cancel(RequestContext context, EventQueue eventQueue) throws A2AError { - TaskUpdater taskUpdater = new TaskUpdater(context, eventQueue); - taskUpdater.cancel(); + public void cancel(RequestContext context, AgentEmitter agentEmitter) throws A2AError { + agentEmitter.cancel(); } }; } diff --git a/extras/queue-manager-replicated/core/src/test/java/io/a2a/extras/queuemanager/replicated/core/ReplicatedQueueManagerTest.java b/extras/queue-manager-replicated/core/src/test/java/io/a2a/extras/queuemanager/replicated/core/ReplicatedQueueManagerTest.java index 14b4c1f51..3865f7c20 100644 --- a/extras/queue-manager-replicated/core/src/test/java/io/a2a/extras/queuemanager/replicated/core/ReplicatedQueueManagerTest.java +++ b/extras/queue-manager-replicated/core/src/test/java/io/a2a/extras/queuemanager/replicated/core/ReplicatedQueueManagerTest.java @@ -151,7 +151,7 @@ void testReplicationStrategyNotTriggeredOnReplicatedEvent() throws InterruptedEx String taskId = "test-task-2"; EventQueue queue = queueManager.createOrTap(taskId); - ReplicatedEventQueueItem replicatedEvent = new ReplicatedEventQueueItem(taskId, testEvent); + ReplicatedEventQueueItem replicatedEvent = new ReplicatedEventQueueItem(taskId, getTaskStatusUpdateEventWithNewId(taskId)); queueManager.onReplicatedEvent(replicatedEvent); assertEquals(0, strategy.getCallCount()); @@ -174,7 +174,7 @@ void testReplicationStrategyWithCountingImplementation() throws InterruptedExcep assertEquals(taskId, countingStrategy.getLastTaskId()); assertEquals(event, countingStrategy.getLastEvent()); - ReplicatedEventQueueItem replicatedEvent = new ReplicatedEventQueueItem(taskId, testEvent); + ReplicatedEventQueueItem replicatedEvent = new ReplicatedEventQueueItem(taskId, getTaskStatusUpdateEventWithNewId(taskId)); queueManager.onReplicatedEvent(replicatedEvent); assertEquals(2, countingStrategy.getCallCount()); @@ -488,7 +488,7 @@ void testReplicatedEventToExistingQueueWhenTaskBecomesInactive() throws Interrup // Create queue and enqueue an event EventQueue queue = queueManager.createOrTap(taskId); - queue.enqueueEvent(testEvent); + queue.enqueueEvent(getTaskStatusUpdateEventWithNewId(taskId)); // Dequeue to clear the queue try { @@ -613,6 +613,11 @@ void testReplicatedQueueClosedEventTerminatesConsumer() throws InterruptedExcept "Second event should be QueueClosedEvent"); } + private TaskStatusUpdateEvent getTaskStatusUpdateEventWithNewId(String taskId) { + return TaskStatusUpdateEvent.builder((TaskStatusUpdateEvent) testEvent).taskId(taskId).build(); + } + + private static class NoOpReplicationStrategy implements ReplicationStrategy { @Override public void send(String taskId, Event event) { diff --git a/extras/queue-manager-replicated/tests-multi-instance/quarkus-common/src/main/java/io/a2a/extras/queuemanager/replicated/tests/multiinstance/common/MultiInstanceReplicationAgentExecutor.java b/extras/queue-manager-replicated/tests-multi-instance/quarkus-common/src/main/java/io/a2a/extras/queuemanager/replicated/tests/multiinstance/common/MultiInstanceReplicationAgentExecutor.java index 546d22939..d51ef8f32 100644 --- a/extras/queue-manager-replicated/tests-multi-instance/quarkus-common/src/main/java/io/a2a/extras/queuemanager/replicated/tests/multiinstance/common/MultiInstanceReplicationAgentExecutor.java +++ b/extras/queue-manager-replicated/tests-multi-instance/quarkus-common/src/main/java/io/a2a/extras/queuemanager/replicated/tests/multiinstance/common/MultiInstanceReplicationAgentExecutor.java @@ -2,8 +2,7 @@ import io.a2a.server.agentexecution.AgentExecutor; import io.a2a.server.agentexecution.RequestContext; -import io.a2a.server.events.EventQueue; -import io.a2a.server.tasks.TaskUpdater; +import io.a2a.server.tasks.AgentEmitter; import io.a2a.spec.A2AError; import io.a2a.spec.Task; import io.a2a.spec.TextPart; @@ -18,9 +17,8 @@ */ public class MultiInstanceReplicationAgentExecutor implements AgentExecutor { @Override - public void execute(RequestContext context, EventQueue eventQueue) throws A2AError { + public void execute(RequestContext context, AgentEmitter agentEmitter) throws A2AError { Task task = context.getTask(); - TaskUpdater updater = new TaskUpdater(context, eventQueue); // Check if message contains "close" signal boolean shouldClose = context.getMessage().parts().stream() @@ -30,19 +28,18 @@ public void execute(RequestContext context, EventQueue eventQueue) throws A2AErr if (shouldClose) { // Close the task - updater.complete(); + agentEmitter.complete(); } else if (task == null) { // First message - create task in SUBMITTED state - updater.submit(); + agentEmitter.submit(); } else { // Subsequent messages - add as artifact - updater.addArtifact(context.getMessage().parts()); + agentEmitter.addArtifact(context.getMessage().parts()); } } @Override - public void cancel(RequestContext context, EventQueue eventQueue) throws A2AError { - TaskUpdater updater = new TaskUpdater(context, eventQueue); - updater.cancel(); + public void cancel(RequestContext context, AgentEmitter agentEmitter) throws A2AError { + agentEmitter.cancel(); } } diff --git a/extras/queue-manager-replicated/tests-single-instance/src/test/java/io/a2a/extras/queuemanager/replicated/tests/ReplicationTestAgentExecutor.java b/extras/queue-manager-replicated/tests-single-instance/src/test/java/io/a2a/extras/queuemanager/replicated/tests/ReplicationTestAgentExecutor.java index 8e4db21dd..8e934df7e 100644 --- a/extras/queue-manager-replicated/tests-single-instance/src/test/java/io/a2a/extras/queuemanager/replicated/tests/ReplicationTestAgentExecutor.java +++ b/extras/queue-manager-replicated/tests-single-instance/src/test/java/io/a2a/extras/queuemanager/replicated/tests/ReplicationTestAgentExecutor.java @@ -7,8 +7,7 @@ import io.a2a.server.agentexecution.AgentExecutor; import io.a2a.server.agentexecution.RequestContext; -import io.a2a.server.events.EventQueue; -import io.a2a.server.tasks.TaskUpdater; +import io.a2a.server.tasks.AgentEmitter; import io.a2a.spec.A2AError; import io.a2a.spec.InvalidRequestError; import io.a2a.spec.Message; @@ -28,27 +27,25 @@ public class ReplicationTestAgentExecutor { public AgentExecutor agentExecutor() { return new AgentExecutor() { @Override - public void execute(RequestContext context, EventQueue eventQueue) throws A2AError { - - TaskUpdater taskUpdater = new TaskUpdater(context, eventQueue); + public void execute(RequestContext context, AgentEmitter agentEmitter) throws A2AError { String lastText = getLastTextPart(context.getMessage()); switch (lastText) { case "create": // Submit task - this should trigger TaskStatusUpdateEvent - taskUpdater.submit(); + agentEmitter.submit(); break; case "working": // Move task to WORKING state without completing - keeps queue alive - taskUpdater.submit(); - taskUpdater.startWork(); + agentEmitter.submit(); + agentEmitter.startWork(); break; case "complete": // Complete the task - should trigger poison pill generation - taskUpdater.submit(); - taskUpdater.startWork(); - taskUpdater.addArtifact(List.of(new TextPart("Task completed"))); - taskUpdater.complete(); + agentEmitter.submit(); + agentEmitter.startWork(); + agentEmitter.addArtifact(List.of(new TextPart("Task completed"))); + agentEmitter.complete(); break; default: throw new InvalidRequestError("Unknown command: " + lastText); @@ -56,9 +53,8 @@ public void execute(RequestContext context, EventQueue eventQueue) throws A2AErr } @Override - public void cancel(RequestContext context, EventQueue eventQueue) throws A2AError { - TaskUpdater taskUpdater = new TaskUpdater(context, eventQueue); - taskUpdater.cancel(); + public void cancel(RequestContext context, AgentEmitter agentEmitter) throws A2AError { + agentEmitter.cancel(); } }; } diff --git a/extras/task-store-database-jpa/src/test/java/io/a2a/extras/taskstore/database/jpa/JpaDatabaseTaskStoreTestAgentExecutor.java b/extras/task-store-database-jpa/src/test/java/io/a2a/extras/taskstore/database/jpa/JpaDatabaseTaskStoreTestAgentExecutor.java index f3b310750..fd530a761 100644 --- a/extras/task-store-database-jpa/src/test/java/io/a2a/extras/taskstore/database/jpa/JpaDatabaseTaskStoreTestAgentExecutor.java +++ b/extras/task-store-database-jpa/src/test/java/io/a2a/extras/taskstore/database/jpa/JpaDatabaseTaskStoreTestAgentExecutor.java @@ -7,8 +7,7 @@ import io.a2a.server.agentexecution.AgentExecutor; import io.a2a.server.agentexecution.RequestContext; -import io.a2a.server.events.EventQueue; -import io.a2a.server.tasks.TaskUpdater; +import io.a2a.server.tasks.AgentEmitter; import io.a2a.spec.A2AError; import io.a2a.spec.InvalidRequestError; import io.a2a.spec.Message; @@ -17,7 +16,7 @@ import io.quarkus.arc.profile.IfBuildProfile; /** - * Simple test AgentExecutor that responds to messages and uses TaskUpdater.addArtifact() + * Simple test AgentExecutor that responds to messages and uses AgentEmitter.addArtifact() * to trigger TaskUpdateEvents for our integration test. */ @IfBuildProfile("test") @@ -28,18 +27,16 @@ public class JpaDatabaseTaskStoreTestAgentExecutor { public AgentExecutor agentExecutor() { return new AgentExecutor() { @Override - public void execute(RequestContext context, EventQueue eventQueue) throws A2AError { + public void execute(RequestContext context, AgentEmitter agentEmitter) throws A2AError { System.out.println("TestAgentExecutor.execute() called for task: " + context.getTaskId()); System.out.println("Message " + context.getMessage()); - - TaskUpdater taskUpdater = new TaskUpdater(context, eventQueue); String lastText = getLastTextPart(context.getMessage()); switch (lastText) { case "create": - taskUpdater.submit(); + agentEmitter.submit(); break; case "add-artifact": - taskUpdater.addArtifact(List.of(new TextPart(lastText)), "art-1", "test", null); + agentEmitter.addArtifact(List.of(new TextPart(lastText)), "art-1", "test", null); break; default: throw new InvalidRequestError(lastText + " is unknown"); @@ -47,9 +44,8 @@ public void execute(RequestContext context, EventQueue eventQueue) throws A2AErr } @Override - public void cancel(RequestContext context, EventQueue eventQueue) throws A2AError { - TaskUpdater taskUpdater = new TaskUpdater(context, eventQueue); - taskUpdater.cancel(); + public void cancel(RequestContext context, AgentEmitter agentEmitter) throws A2AError { + agentEmitter.cancel(); } }; } diff --git a/server-common/src/main/java/io/a2a/server/agentexecution/AgentExecutor.java b/server-common/src/main/java/io/a2a/server/agentexecution/AgentExecutor.java index 0dfce6088..08d6b56d9 100644 --- a/server-common/src/main/java/io/a2a/server/agentexecution/AgentExecutor.java +++ b/server-common/src/main/java/io/a2a/server/agentexecution/AgentExecutor.java @@ -1,6 +1,7 @@ package io.a2a.server.agentexecution; import io.a2a.server.events.EventQueue; +import io.a2a.server.tasks.AgentEmitter; import io.a2a.spec.A2AError; /** @@ -17,9 +18,8 @@ * asynchronously in a background thread pool when requests arrive from transport layers. * Your implementation should: *
    - *
  • Use the {@link EventQueue} to enqueue task status updates and artifacts
  • - *
  • Use {@link io.a2a.server.tasks.TaskUpdater} helper for common lifecycle operations
  • - *
  • Handle cancellation via the {@link #cancel(RequestContext, EventQueue)} method
  • + *
  • Use the {@link AgentEmitter} to send messages, update task status, and stream artifacts
  • + *
  • Handle cancellation via the {@link #cancel(RequestContext, AgentEmitter)} method
  • *
  • Be thread-safe if maintaining state across invocations
  • *
* @@ -56,14 +56,12 @@ * } * * @Override - * public void execute(RequestContext context, EventQueue eventQueue) { - * TaskUpdater updater = new TaskUpdater(context, eventQueue); - * + * public void execute(RequestContext context, AgentEmitter emitter) { * // Initialize task if this is a new conversation * if (context.getTask() == null) { - * updater.submit(); + * emitter.submit(); * } - * updater.startWork(); + * emitter.startWork(); * * // Extract user input from the message * String userMessage = context.getUserInput("\n"); @@ -72,14 +70,14 @@ * String weatherData = weatherService.getWeather(userMessage); * * // Return result as artifact - * updater.addArtifact(List.of(new TextPart(weatherData, null))); - * updater.complete(); + * emitter.addArtifact(List.of(new TextPart(weatherData, null))); + * emitter.complete(); * } * * @Override - * public void cancel(RequestContext context, EventQueue eventQueue) { + * public void cancel(RequestContext context, AgentEmitter emitter) { * // Clean up resources and mark as canceled - * new TaskUpdater(context, eventQueue).cancel(); + * emitter.cancel(); * } * } * } @@ -87,16 +85,15 @@ *

Streaming Results

* For long-running operations or LLM streaming, enqueue multiple artifacts: *
{@code
- * updater.startWork();
+ * emitter.startWork();
  * for (String chunk : llmService.stream(userInput)) {
- *     updater.addArtifact(List.of(new TextPart(chunk, null)));
+ *     emitter.addArtifact(List.of(new TextPart(chunk, null)));
  * }
- * updater.complete();  // Final event closes the queue
+ * emitter.complete();  // Final event closes the queue
  * }
* * @see RequestContext - * @see io.a2a.server.tasks.TaskUpdater - * @see io.a2a.server.events.EventQueue + * @see AgentEmitter * @see io.a2a.server.requesthandlers.DefaultRequestHandler * @see io.a2a.spec.AgentCard */ @@ -111,15 +108,15 @@ public interface AgentExecutor { *

*

* Important: Don't throw exceptions for business logic errors. Instead, use - * {@code updater.fail(errorMessage)} to communicate failures to the client gracefully. + * {@code emitter.fail(errorMessage)} to communicate failures to the client gracefully. * Only throw {@link A2AError} for truly exceptional conditions. *

* * @param context the request context containing the message, task state, and configuration - * @param eventQueue the queue for enqueueing status updates and artifacts + * @param emitter the agent emitter for sending messages, updating status, and streaming artifacts * @throws A2AError if execution fails catastrophically (exception propagates to client) */ - void execute(RequestContext context, EventQueue eventQueue) throws A2AError; + void execute(RequestContext context, AgentEmitter emitter) throws A2AError; /** * Cancels an ongoing agent execution. @@ -128,11 +125,11 @@ public interface AgentExecutor { * You should: *
    *
  • Stop any ongoing work (interrupt LLM calls, cancel API requests)
  • - *
  • Enqueue a CANCELED status event (typically via {@code TaskUpdater.cancel()})
  • + *
  • Enqueue a CANCELED status event (typically via {@code emitter.cancel()})
  • *
  • Clean up resources (close connections, release locks)
  • *
*

- * Note: The {@link #execute(RequestContext, EventQueue)} method may still be + * Note: The {@link #execute(RequestContext, AgentEmitter)} method may still be * running on another thread. Use appropriate synchronization or interruption mechanisms * if your agent maintains cancellable state. *

@@ -146,9 +143,9 @@ public interface AgentExecutor { * * * @param context the request context for the task being canceled - * @param eventQueue the queue for enqueueing the cancellation event + * @param emitter the agent emitter for sending the cancellation event * @throws io.a2a.spec.TaskNotCancelableError if this agent does not support cancellation * @throws A2AError if cancellation is supported but failed to execute */ - void cancel(RequestContext context, EventQueue eventQueue) throws A2AError; + void cancel(RequestContext context, AgentEmitter emitter) throws A2AError; } diff --git a/server-common/src/main/java/io/a2a/server/agentexecution/RequestContext.java b/server-common/src/main/java/io/a2a/server/agentexecution/RequestContext.java index e79298703..82a8b9e17 100644 --- a/server-common/src/main/java/io/a2a/server/agentexecution/RequestContext.java +++ b/server-common/src/main/java/io/a2a/server/agentexecution/RequestContext.java @@ -35,7 +35,7 @@ * *

Common Usage Patterns

*
{@code
- * public void execute(RequestContext context, EventQueue queue) {
+ * public void execute(RequestContext context, AgentEmitter emitter) {
  *     // Check if this is a new conversation or continuation
  *     Task existingTask = context.getTask();
  *     if (existingTask == null) {
diff --git a/server-common/src/main/java/io/a2a/server/events/EventQueue.java b/server-common/src/main/java/io/a2a/server/events/EventQueue.java
index 99f8bc2dc..5bbdb3a14 100644
--- a/server-common/src/main/java/io/a2a/server/events/EventQueue.java
+++ b/server-common/src/main/java/io/a2a/server/events/EventQueue.java
@@ -13,6 +13,7 @@
 import io.a2a.server.tasks.TaskStateProvider;
 import io.a2a.spec.Event;
 import io.a2a.spec.Task;
+import io.a2a.spec.TaskArtifactUpdateEvent;
 import io.a2a.spec.TaskStatusUpdateEvent;
 import org.jspecify.annotations.Nullable;
 import org.slf4j.Logger;
@@ -431,6 +432,9 @@ public void enqueueItem(EventQueueItem item) {
             // We bypass the parent's closed check and enqueue directly
             Event event = item.getEvent();
 
+            // Validate event taskId matches queue taskId
+            validateEventIds(event);
+
             // Check if this is a final event BEFORE submitting to MainEventBus
             // If it is, notify all children to expect it (so they wait for MainEventBusProcessor)
             if (isFinalEvent(event)) {
@@ -458,6 +462,47 @@ public void enqueueItem(EventQueueItem item) {
             mainEventBus.submit(taskId, this, item);
         }
 
+        /**
+         * Validates that events with taskId fields match this queue's taskId.
+         *
+         * 

Validation Rules: + *

    + *
  • Task, TaskStatusUpdateEvent, TaskArtifactUpdateEvent: MUST match queue taskId
  • + *
  • Message: taskId is OPTIONAL, not validated (can exist without tasks)
  • + *
  • Other events: no validation
  • + *
  • Null queue taskId: skip validation (initialization phase)
  • + *
+ * + * @param event the event to validate + * @throws IllegalArgumentException if event has mismatched taskId + */ + private void validateEventIds(Event event) { + if (taskId == null) { + return; // Allow any event during initialization + } + + String eventTaskId = null; + String eventType = null; + + if (event instanceof Task task) { + eventTaskId = task.id(); + eventType = "Task"; + } else if (event instanceof TaskStatusUpdateEvent statusEvent) { + eventTaskId = statusEvent.taskId(); + eventType = "TaskStatusUpdateEvent"; + } else if (event instanceof TaskArtifactUpdateEvent artifactEvent) { + eventTaskId = artifactEvent.taskId(); + eventType = "TaskArtifactUpdateEvent"; + } + // Note: Message.taskId is NOT validated - messages can exist independently + + if (eventTaskId != null && !eventTaskId.equals(taskId)) { + throw new IllegalArgumentException( + String.format("Event taskId mismatch: queue=%s, event=%s, eventType=%s", + taskId, eventTaskId, eventType)); + } + } + /** * Checks if an event represents a final task state. */ diff --git a/server-common/src/main/java/io/a2a/server/requesthandlers/DefaultRequestHandler.java b/server-common/src/main/java/io/a2a/server/requesthandlers/DefaultRequestHandler.java index c476c8741..40e7954e6 100644 --- a/server-common/src/main/java/io/a2a/server/requesthandlers/DefaultRequestHandler.java +++ b/server-common/src/main/java/io/a2a/server/requesthandlers/DefaultRequestHandler.java @@ -38,6 +38,7 @@ import io.a2a.server.events.MainEventBusProcessor; import io.a2a.server.events.MainEventBusProcessorCallback; import io.a2a.server.events.QueueManager; +import io.a2a.server.tasks.AgentEmitter; import io.a2a.server.tasks.PushNotificationConfigStore; import io.a2a.server.tasks.PushNotificationSender; import io.a2a.server.tasks.ResultAggregator; @@ -100,7 +101,7 @@ *
  • Transport calls {@link #onMessageSend(MessageSendParams, ServerCallContext)}
  • *
  • Initialize {@link TaskManager} and {@link RequestContext}
  • *
  • Create or tap {@link EventQueue} via {@link QueueManager}
  • - *
  • Execute {@link AgentExecutor#execute(RequestContext, EventQueue)} asynchronously in background thread pool
  • + *
  • Execute {@link AgentExecutor#execute(RequestContext, AgentEmitter)} asynchronously in background thread pool
  • *
  • Consume events from queue on Vert.x worker thread via {@link EventConsumer}
  • *
  • For blocking=true: wait for agent completion and full event consumption
  • *
  • Return {@link Task} or {@link Message} to transport
  • @@ -111,7 +112,7 @@ *
      *
    1. Transport calls {@link #onMessageSendStream(MessageSendParams, ServerCallContext)}
    2. *
    3. Initialize components (same as blocking)
    4. - *
    5. Execute {@link AgentExecutor#execute(RequestContext, EventQueue)} asynchronously
    6. + *
    7. Execute {@link AgentExecutor#execute(RequestContext, AgentEmitter)} asynchronously
    8. *
    9. Return {@link java.util.concurrent.Flow.Publisher Flow.Publisher}<StreamingEventKind> immediately
    10. *
    11. Events stream to client as they arrive in the queue
    12. *
    13. On client disconnect: continue consumption in background (fire-and-forget)
    14. @@ -130,7 +131,7 @@ *

      Threading Model

      *
        *
      • Vert.x worker threads: Execute request handler methods (onMessageSend, etc.)
      • - *
      • Agent-executor pool (@Internal): Execute {@link AgentExecutor#execute(RequestContext, EventQueue)}
      • + *
      • Agent-executor pool (@Internal): Execute {@link AgentExecutor#execute(RequestContext, AgentEmitter)}
      • *
      • Background cleanup: {@link java.util.concurrent.CompletableFuture CompletableFuture} async tasks
      • *
      *

      @@ -380,14 +381,14 @@ public Task onCancelTask(TaskIdParams params, ServerCallContext context) throws ResultAggregator resultAggregator = new ResultAggregator(taskManager, null, executor, eventConsumerExecutor); EventQueue queue = queueManager.createOrTap(task.id()); - agentExecutor.cancel( - requestContextBuilder.get() - .setTaskId(task.id()) - .setContextId(task.contextId()) - .setTask(task) - .setServerCallContext(context) - .build(), - queue); + RequestContext cancelRequestContext = requestContextBuilder.get() + .setTaskId(task.id()) + .setContextId(task.contextId()) + .setTask(task) + .setServerCallContext(context) + .build(); + AgentEmitter emitter = new AgentEmitter(cancelRequestContext, queue); + agentExecutor.cancel(cancelRequestContext, emitter); Optional.ofNullable(runningAgents.get(task.id())) .ifPresent(cf -> cf.cancel(true)); @@ -873,7 +874,8 @@ private EnhancedRunnable registerAndExecuteAgentAsync(String taskId, RequestCont @Override public void run() { LOGGER.debug("Agent execution starting for task {}", taskId); - agentExecutor.execute(requestContext, queue); + AgentEmitter emitter = new AgentEmitter(requestContext, queue); + agentExecutor.execute(requestContext, emitter); LOGGER.debug("Agent execution completed for task {}", taskId); // The consumer (running on the Vert.x worker thread) handles queue lifecycle. // This avoids blocking agent-executor threads waiting for worker threads. diff --git a/server-common/src/main/java/io/a2a/server/tasks/AgentEmitter.java b/server-common/src/main/java/io/a2a/server/tasks/AgentEmitter.java new file mode 100644 index 000000000..8b9e64a14 --- /dev/null +++ b/server-common/src/main/java/io/a2a/server/tasks/AgentEmitter.java @@ -0,0 +1,617 @@ +package io.a2a.server.tasks; + +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.atomic.AtomicBoolean; + +import io.a2a.server.agentexecution.RequestContext; +import io.a2a.server.events.EventQueue; +import io.a2a.spec.A2AError; +import io.a2a.spec.Artifact; +import io.a2a.spec.Event; +import io.a2a.spec.Message; +import io.a2a.spec.Part; +import io.a2a.spec.Task; +import io.a2a.spec.TaskArtifactUpdateEvent; +import io.a2a.spec.TaskState; +import io.a2a.spec.TaskStatus; +import io.a2a.spec.TaskStatusUpdateEvent; +import io.a2a.spec.TextPart; +import org.jspecify.annotations.Nullable; + +/** + * Helper for emitting events from AgentExecutor implementations. + * + *

      AgentEmitter provides a simplified API for agents to communicate with clients through + * the A2A protocol. It handles both task lifecycle management and direct message sending, + * automatically populating events with correct task and context IDs from the RequestContext. + * + *

      Core Capabilities

      + *
        + *
      • Task Lifecycle: {@link #submit()}, {@link #startWork()}, {@link #complete()}, + * {@link #fail()}, {@link #cancel()}, {@link #reject()}
      • + *
      • Message Sending: {@link #sendMessage(String)}, {@link #sendMessage(List)}, + * {@link #sendMessage(List, Map)}
      • + *
      • Artifact Streaming: {@link #addArtifact(List)}, {@link #addArtifact(List, String, String, Map)}
      • + *
      • Auth/Input Requirements: {@link #requiresAuth()}, {@link #requiresInput()}
      • + *
      • Custom Events: {@link #taskBuilder()}, {@link #messageBuilder()}, {@link #addTask(Task)}, {@link #emitEvent(Event)}
      • + *
      + * + *

      Usage Patterns

      + * + *

      Simple Message Response (No Task)

      + *
      {@code
      + * public void execute(RequestContext context, AgentEmitter emitter) {
      + *     String response = processRequest(context.getUserInput("\n"));
      + *     emitter.sendMessage(response);
      + * }
      + * }
      + * + *

      Task Lifecycle with Artifacts

      + *
      {@code
      + * public void execute(RequestContext context, AgentEmitter emitter) {
      + *     if (context.getTask() == null) {
      + *         emitter.submit();  // Create task in SUBMITTED state
      + *     }
      + *     emitter.startWork();  // Transition to WORKING
      + *
      + *     // Process and stream results
      + *     List> results = doWork(context.getUserInput("\n"));
      + *     emitter.addArtifact(results);
      + *
      + *     emitter.complete();  // Mark as COMPLETED
      + * }
      + * }
      + * + *

      Streaming Response (LLM)

      + *
      {@code
      + * public void execute(RequestContext context, AgentEmitter emitter) {
      + *     emitter.startWork();
      + *
      + *     for (String chunk : llmService.stream(context.getUserInput("\n"))) {
      + *         emitter.addArtifact(List.of(new TextPart(chunk)));
      + *     }
      + *
      + *     emitter.complete();
      + * }
      + * }
      + * + *

      Event ID Management

      + * All emitted events are automatically populated with: + *
        + *
      • taskId: From RequestContext (may be null for message-only responses)
      • + *
      • contextId: From RequestContext
      • + *
      • messageId: Generated UUID for messages
      • + *
      • artifactId: Generated UUID for artifacts (unless explicitly provided)
      • + *
      + * + * Events are validated by the EventQueue to ensure taskId correctness. + * + * @see io.a2a.server.agentexecution.AgentExecutor + * @see RequestContext + * @see EventQueue + * @since 1.0.0 + */ +public class AgentEmitter { + private final EventQueue eventQueue; + private final @Nullable String taskId; + private final @Nullable String contextId; + private final AtomicBoolean terminalStateReached = new AtomicBoolean(false); + private final Object stateLock = new Object(); + + /** + * Creates a new AgentEmitter for the given request context and event queue. + * + * @param context the request context containing task and context IDs + * @param eventQueue the event queue for enqueueing events + */ + public AgentEmitter(RequestContext context, EventQueue eventQueue) { + this.eventQueue = eventQueue; + this.taskId = context.getTaskId(); + this.contextId = context.getContextId(); + } + + private void updateStatus(TaskState taskState) { + updateStatus(taskState, null, taskState.isFinal()); + } + + /** + * Updates the task status to the given state with an optional message. + * + * @param taskState the new task state + * @param message optional message to include with the status update + */ + public void updateStatus(TaskState taskState, @Nullable Message message) { + updateStatus(taskState, message, taskState.isFinal()); + } + + /** + * Updates the task status to the given state with an optional message and finality flag. + * + * @param state the new task state + * @param message optional message to include with the status update + * @param isFinal whether this is a final status (prevents further updates) + */ + public void updateStatus(TaskState state, @Nullable Message message, boolean isFinal) { + synchronized (stateLock) { + // Check if we're already in a terminal state + if (terminalStateReached.get()) { + throw new IllegalStateException("Cannot update task status - terminal state already reached"); + } + + // If this is a final state, set the flag + if (isFinal) { + terminalStateReached.set(true); + } + + TaskStatusUpdateEvent event = TaskStatusUpdateEvent.builder() + .taskId(taskId) + .contextId(contextId) + .isFinal(isFinal) + .status(new TaskStatus(state, message, null)) + .build(); + eventQueue.enqueueEvent(event); + } + } + + /** + * Returns the context ID for this emitter. + * + * @return the context ID, or null if not available + */ + public @Nullable String getContextId() { + return this.contextId; + } + + /** + * Returns the task ID for this emitter. + * + * @return the task ID, or null if no task is associated + */ + public @Nullable String getTaskId() { + return this.taskId; + } + + /** + * Adds an artifact with the given parts to the task. + * + * @param parts the parts to include in the artifact + */ + public void addArtifact(List> parts) { + addArtifact(parts, null, null, null); + } + + /** + * Adds an artifact with the given parts, artifact ID, name, and metadata. + * + * @param parts the parts to include in the artifact + * @param artifactId optional artifact ID (generated if null) + * @param name optional artifact name + * @param metadata optional metadata map + */ + public void addArtifact(List> parts, @Nullable String artifactId, @Nullable String name, @Nullable Map metadata) { + addArtifact(parts, artifactId, name, metadata, null, null); + } + + /** + * Adds an artifact with all optional parameters. + * + * @param parts the parts to include in the artifact + * @param artifactId optional artifact ID (generated if null) + * @param name optional artifact name + * @param metadata optional metadata map + * @param append whether to append to an existing artifact + * @param lastChunk whether this is the last chunk in a streaming sequence + */ + public void addArtifact(List> parts, @Nullable String artifactId, @Nullable String name, @Nullable Map metadata, + @Nullable Boolean append, @Nullable Boolean lastChunk) { + if (artifactId == null) { + artifactId = UUID.randomUUID().toString(); + } + TaskArtifactUpdateEvent event = TaskArtifactUpdateEvent.builder() + .taskId(taskId) + .contextId(contextId) + .artifact( + Artifact.builder() + .artifactId(artifactId) + .name(name) + .parts(parts) + .metadata(metadata) + .build() + ) + .append(append) + .lastChunk(lastChunk) + .build(); + eventQueue.enqueueEvent(event); + } + + /** + * Marks the task as COMPLETED. + */ + public void complete() { + complete(null); + } + + /** + * Marks the task as COMPLETED with an optional message. + * + * @param message optional message to include with completion + */ + public void complete(@Nullable Message message) { + updateStatus(TaskState.COMPLETED, message); + } + + /** + * Marks the task as FAILED. + */ + public void fail() { + fail((Message) null); + } + + /** + * Marks the task as FAILED with an optional message. + * + * @param message optional message to include with failure + */ + public void fail(@Nullable Message message) { + updateStatus(TaskState.FAILED, message); + } + + /** + * Enqueues an A2A error event and marks the task as FAILED. + *

      + * Use this when you need to fail the task with a specific A2A error (such as + * {@link io.a2a.spec.UnsupportedOperationError}, {@link io.a2a.spec.InvalidRequestError}, + * {@link io.a2a.spec.TaskNotFoundError}, etc.) that should be sent to the client. + *

      + *

      + * This is a convenience method that combines error enqueueing with status transition. + * It enqueues the error event first, then marks the task as FAILED. + *

      + *

      Example usage: + *

      {@code
      +     * public void execute(RequestContext context, AgentEmitter emitter) {
      +     *     if (!isSupported(context.getMessage())) {
      +     *         emitter.fail(new UnsupportedOperationError("Feature not supported"));
      +     *         return;
      +     *     }
      +     *     // ... normal processing
      +     * }
      +     * }
      + * + * @param error the A2A error to enqueue and send to the client + * @since 1.0.0 + */ + public void fail(A2AError error) { + eventQueue.enqueueEvent(error); + updateStatus(TaskState.FAILED); + } + + /** + * Marks the task as SUBMITTED. + */ + public void submit() { + submit(null); + } + + /** + * Marks the task as SUBMITTED with an optional message. + * + * @param message optional message to include + */ + public void submit(@Nullable Message message) { + updateStatus(TaskState.SUBMITTED, message); + } + + /** + * Marks the task as WORKING (actively being processed). + */ + public void startWork() { + startWork(null); + } + + /** + * Marks the task as WORKING with an optional message. + * + * @param message optional message to include + */ + public void startWork(@Nullable Message message) { + updateStatus(TaskState.WORKING, message); + } + + /** + * Marks the task as CANCELED. + */ + public void cancel() { + cancel(null); + } + + /** + * Marks the task as CANCELED with an optional message. + * + * @param message optional message to include + */ + public void cancel(@Nullable Message message) { + updateStatus(TaskState.CANCELED, message); + } + + /** + * Marks the task as REJECTED. + */ + public void reject() { + reject(null); + } + + /** + * Marks the task as REJECTED with an optional message. + * + * @param message optional message to include + */ + public void reject(@Nullable Message message) { + updateStatus(TaskState.REJECTED, message); + } + + /** + * Marks the task as INPUT_REQUIRED, indicating the agent needs user input to continue. + */ + public void requiresInput() { + requiresInput(null, false); + } + + /** + * Marks the task as INPUT_REQUIRED with an optional message. + * + * @param message optional message to include + */ + public void requiresInput(@Nullable Message message) { + requiresInput(message, false); + } + + /** + * Marks the task as INPUT_REQUIRED with a finality flag. + * + * @param isFinal whether this is a final status (prevents further updates) + */ + public void requiresInput(boolean isFinal) { + requiresInput(null, isFinal); + } + + /** + * Marks the task as INPUT_REQUIRED with an optional message and finality flag. + * + * @param message optional message to include + * @param isFinal whether this is a final status (prevents further updates) + */ + public void requiresInput(@Nullable Message message, boolean isFinal) { + updateStatus(TaskState.INPUT_REQUIRED, message, isFinal); + } + + /** + * Marks the task as AUTH_REQUIRED, indicating the agent needs authentication to continue. + */ + public void requiresAuth() { + requiresAuth(null, false); + } + + /** + * Marks the task as AUTH_REQUIRED with an optional message. + * + * @param message optional message to include + */ + public void requiresAuth(@Nullable Message message) { + requiresAuth(message, false); + } + + /** + * Marks the task as AUTH_REQUIRED with a finality flag. + * + * @param isFinal whether this is a final status (prevents further updates) + */ + public void requiresAuth(boolean isFinal) { + requiresAuth(null, isFinal); + } + + /** + * Marks the task as AUTH_REQUIRED with an optional message and finality flag. + * + * @param message optional message to include + * @param isFinal whether this is a final status (prevents further updates) + */ + public void requiresAuth(@Nullable Message message, boolean isFinal) { + updateStatus(TaskState.AUTH_REQUIRED, message, isFinal); + } + + /** + * Creates a new agent message with the given parts and metadata. + * Pre-populates the message with agent role, task ID, context ID, and a generated message ID. + * + * @param parts the parts to include in the message + * @param metadata optional metadata to attach to the message + * @return a new Message object ready to be sent + */ + public Message newAgentMessage(List> parts, @Nullable Map metadata) { + return Message.builder() + .role(Message.Role.AGENT) + .taskId(taskId) + .contextId(contextId) + .messageId(UUID.randomUUID().toString()) + .metadata(metadata) + .parts(parts) + .build(); + } + + /** + * Sends a simple text message to the client. + * Convenience method for agents that respond with plain text without creating a task. + * + * @param text the text content to send + */ + public void sendMessage(String text) { + sendMessage(List.of(new TextPart(text))); + } + + /** + * Sends a message with custom parts (text, images, etc.) to the client. + * Use this for rich responses that don't require task lifecycle management. + * + * @param parts the message parts to send + */ + public void sendMessage(List> parts) { + sendMessage(parts, null); + } + + /** + * Sends a message with parts and metadata to the client. + * Creates an agent message with the current task and context IDs (if available) + * and enqueues it to the event queue. + * + * @param parts the message parts to send + * @param metadata optional metadata to attach to the message + */ + public void sendMessage(List> parts, @Nullable Map metadata) { + Message message = newAgentMessage(parts, metadata); + eventQueue.enqueueEvent(message); + } + + /** + * Sends an existing Message object directly to the client. + *

      + * Use this when you need to forward or echo an existing message without creating a new one. + * The message is enqueued as-is, preserving its messageId, metadata, and all other fields. + *

      + *

      + * Note: This is typically used for forwarding user messages or preserving specific + * message properties. For most cases, prefer {@link #sendMessage(String)} or + * {@link #sendMessage(List)} which create new agent messages with generated IDs. + *

      + *

      Example usage: + *

      {@code
      +     * public void execute(RequestContext context, AgentEmitter emitter) {
      +     *     // Echo the user's message back
      +     *     emitter.sendMessage(context.getMessage());
      +     * }
      +     * }
      + * + * @param message the message to send to the client + * @since 1.0.0 + */ + public void sendMessage(Message message) { + eventQueue.enqueueEvent(message); + } + + /** + * Adds a custom Task object to be sent to the client. + *

      + * Use this when you need to create a Task with specific fields (history, artifacts, etc.) + * that the convenience methods like {@link #submit()}, {@link #startWork()}, or + * {@link #complete()} don't provide. + *

      + *

      + * Typical usage pattern: Build a task with {@link #taskBuilder()}, customize it, + * then add it with this method. + *

      + *

      Example usage: + *

      {@code
      +     * public void execute(RequestContext context, AgentEmitter emitter) {
      +     *     // Create a task with specific status and history
      +     *     Task task = emitter.taskBuilder()
      +     *         .status(new TaskStatus(TaskState.SUBMITTED))
      +     *         .history(List.of(context.getMessage()))
      +     *         .build();
      +     *     emitter.addTask(task);
      +     * }
      +     * }
      + * + * @param task the task to add + * @since 1.0.0 + */ + public void addTask(Task task) { + eventQueue.enqueueEvent(task); + } + + /** + * Emits a custom Event object to the client. + *

      + * This is a general-purpose method for emitting any Event type. Most agents should use the + * convenience methods ({@link #sendMessage(String)}, {@link #addTask(Task)}, + * {@link #addArtifact(List)}, {@link #complete()}, etc.), but this method provides flexibility + * for agents that need to create and emit custom events using the event builders. + *

      + *

      Example usage: + *

      {@code
      +     * public void execute(RequestContext context, AgentEmitter emitter) {
      +     *     // Create a custom TaskStatusUpdateEvent
      +     *     TaskStatusUpdateEvent event = TaskStatusUpdateEvent.builder()
      +     *         .taskId(context.getTaskId())
      +     *         .contextId(context.getContextId())
      +     *         .status(new TaskStatus(TaskState.WORKING))
      +     *         .isFinal(false)
      +     *         .build();
      +     *     emitter.emitEvent(event);
      +     * }
      +     * }
      + * + * @param event the event to emit + * @since 1.0.0 + */ + public void emitEvent(Event event) { + eventQueue.enqueueEvent(event); + } + + /** + * Creates a Task.Builder pre-populated with the correct task and context IDs. + * Agents can customize other Task fields (status, artifacts, etc.) before calling build(). + * + *

      Example usage: + *

      {@code
      +     * Task task = emitter.taskBuilder()
      +     *     .status(new TaskStatus(TaskState.WORKING))
      +     *     .build();
      +     * }
      + * + * @return a Task.Builder with id and contextId already set + */ + public Task.Builder taskBuilder() { + return Task.builder() + .id(taskId) + .contextId(contextId); + } + + /** + * Creates a Message.Builder pre-populated with agent defaults. + * Sets taskId only if non-null (messages can exist independently of tasks). + * + *

      Pre-populated fields: + *

        + *
      • taskId - set only if this AgentEmitter has a non-null taskId
      • + *
      • contextId - current context ID
      • + *
      • role - Message.Role.AGENT
      • + *
      • messageId - generated UUID
      • + *
      + * + *

      Example usage: + *

      {@code
      +     * Message msg = emitter.messageBuilder()
      +     *     .parts(List.of(new TextPart("Hello")))
      +     *     .metadata(Map.of("key", "value"))
      +     *     .build();
      +     * }
      + * + * @return a Message.Builder with common agent fields already set + */ + public Message.Builder messageBuilder() { + Message.Builder builder = Message.builder() + .contextId(contextId) + .role(Message.Role.AGENT) + .messageId(UUID.randomUUID().toString()); + + // Only set taskId if present (messages can exist without tasks) + if (taskId != null) { + builder.taskId(taskId); + } + + return builder; + } + +} diff --git a/server-common/src/main/java/io/a2a/server/tasks/TaskUpdater.java b/server-common/src/main/java/io/a2a/server/tasks/TaskUpdater.java deleted file mode 100644 index 022cde56b..000000000 --- a/server-common/src/main/java/io/a2a/server/tasks/TaskUpdater.java +++ /dev/null @@ -1,191 +0,0 @@ -package io.a2a.server.tasks; - -import java.util.List; -import java.util.Map; -import java.util.UUID; -import java.util.concurrent.atomic.AtomicBoolean; - -import io.a2a.server.agentexecution.RequestContext; -import io.a2a.server.events.EventQueue; -import io.a2a.spec.Artifact; -import io.a2a.spec.Message; -import io.a2a.spec.Part; -import io.a2a.spec.TaskArtifactUpdateEvent; -import io.a2a.spec.TaskState; -import io.a2a.spec.TaskStatus; -import io.a2a.spec.TaskStatusUpdateEvent; -import org.jspecify.annotations.Nullable; - -public class TaskUpdater { - private final EventQueue eventQueue; - private final @Nullable String taskId; - private final @Nullable String contextId; - private final AtomicBoolean terminalStateReached = new AtomicBoolean(false); - private final Object stateLock = new Object(); - - public TaskUpdater(RequestContext context, EventQueue eventQueue) { - this.eventQueue = eventQueue; - this.taskId = context.getTaskId(); - this.contextId = context.getContextId(); - } - - private void updateStatus(TaskState taskState) { - updateStatus(taskState, null, taskState.isFinal()); - } - - public void updateStatus(TaskState taskState, @Nullable Message message) { - updateStatus(taskState, message, taskState.isFinal()); - } - - public void updateStatus(TaskState state, @Nullable Message message, boolean isFinal) { - synchronized (stateLock) { - // Check if we're already in a terminal state - if (terminalStateReached.get()) { - throw new IllegalStateException("Cannot update task status - terminal state already reached"); - } - - // If this is a final state, set the flag - if (isFinal) { - terminalStateReached.set(true); - } - - TaskStatusUpdateEvent event = TaskStatusUpdateEvent.builder() - .taskId(taskId) - .contextId(contextId) - .isFinal(isFinal) - .status(new TaskStatus(state, message, null)) - .build(); - eventQueue.enqueueEvent(event); - } - } - - public @Nullable String getContextId() { - return this.contextId; - } - - public @Nullable String getTaskId() { - return this.taskId; - } - - public void addArtifact(List> parts) { - addArtifact(parts, null, null, null); - } - - public void addArtifact(List> parts, @Nullable String artifactId, @Nullable String name, @Nullable Map metadata) { - addArtifact(parts, artifactId, name, metadata, null, null); - } - - public void addArtifact(List> parts, @Nullable String artifactId, @Nullable String name, @Nullable Map metadata, - @Nullable Boolean append, @Nullable Boolean lastChunk) { - if (artifactId == null) { - artifactId = UUID.randomUUID().toString(); - } - TaskArtifactUpdateEvent event = TaskArtifactUpdateEvent.builder() - .taskId(taskId) - .contextId(contextId) - .artifact( - Artifact.builder() - .artifactId(artifactId) - .name(name) - .parts(parts) - .metadata(metadata) - .build() - ) - .append(append) - .lastChunk(lastChunk) - .build(); - eventQueue.enqueueEvent(event); - } - - public void complete() { - complete(null); - } - - public void complete(@Nullable Message message) { - updateStatus(TaskState.COMPLETED, message); - } - - public void fail() { - fail(null); - } - - public void fail(@Nullable Message message) { - updateStatus(TaskState.FAILED, message); - } - - public void submit() { - submit(null); - } - - public void submit(@Nullable Message message) { - updateStatus(TaskState.SUBMITTED, message); - } - - public void startWork() { - startWork(null); - } - - public void startWork(@Nullable Message message) { - updateStatus(TaskState.WORKING, message); - } - - public void cancel() { - cancel(null); - } - - public void cancel(@Nullable Message message) { - updateStatus(TaskState.CANCELED, message); - } - - public void reject() { - reject(null); - } - - public void reject(@Nullable Message message) { - updateStatus(TaskState.REJECTED, message); - } - - public void requiresInput() { - requiresInput(null, false); - } - - public void requiresInput(@Nullable Message message) { - requiresInput(message, false); - } - - public void requiresInput(boolean isFinal) { - requiresInput(null, isFinal); - } - - public void requiresInput(@Nullable Message message, boolean isFinal) { - updateStatus(TaskState.INPUT_REQUIRED, message, isFinal); - } - - public void requiresAuth() { - requiresAuth(null, false); - } - - public void requiresAuth(@Nullable Message message) { - requiresAuth(message, false); - } - - public void requiresAuth(boolean isFinal) { - requiresAuth(null, isFinal); - } - - public void requiresAuth(@Nullable Message message, boolean isFinal) { - updateStatus(TaskState.AUTH_REQUIRED, message, isFinal); - } - - public Message newAgentMessage(List> parts, @Nullable Map metadata) { - return Message.builder() - .role(Message.Role.AGENT) - .taskId(taskId) - .contextId(contextId) - .messageId(UUID.randomUUID().toString()) - .metadata(metadata) - .parts(parts) - .build(); - } - -} diff --git a/server-common/src/test/java/io/a2a/server/requesthandlers/AbstractA2ARequestHandlerTest.java b/server-common/src/test/java/io/a2a/server/requesthandlers/AbstractA2ARequestHandlerTest.java index 9c64f03f9..61803f233 100644 --- a/server-common/src/test/java/io/a2a/server/requesthandlers/AbstractA2ARequestHandlerTest.java +++ b/server-common/src/test/java/io/a2a/server/requesthandlers/AbstractA2ARequestHandlerTest.java @@ -24,7 +24,7 @@ import io.a2a.jsonrpc.common.json.JsonUtil; import io.a2a.server.agentexecution.AgentExecutor; import io.a2a.server.agentexecution.RequestContext; -import io.a2a.server.events.EventQueue; +import io.a2a.server.tasks.AgentEmitter; import io.a2a.server.events.EventQueueItem; import io.a2a.server.events.EventQueueUtil; import io.a2a.server.events.InMemoryQueueManager; @@ -87,16 +87,16 @@ public class AbstractA2ARequestHandlerTest { public void init() { executor = new AgentExecutor() { @Override - public void execute(RequestContext context, EventQueue eventQueue) throws A2AError { + public void execute(RequestContext context, AgentEmitter agentEmitter) throws A2AError { if (agentExecutorExecute != null) { - agentExecutorExecute.invoke(context, eventQueue); + agentExecutorExecute.invoke(context, agentEmitter); } } @Override - public void cancel(RequestContext context, EventQueue eventQueue) throws A2AError { + public void cancel(RequestContext context, AgentEmitter agentEmitter) throws A2AError { if (agentExecutorCancel != null) { - agentExecutorCancel.invoke(context, eventQueue); + agentExecutorCancel.invoke(context, agentEmitter); } } }; @@ -168,7 +168,7 @@ private static String loadPreferredTransportFromProperties() { } protected interface AgentExecutorMethod { - void invoke(RequestContext context, EventQueue eventQueue) throws A2AError; + void invoke(RequestContext context, AgentEmitter agentEmitter) throws A2AError; } /** diff --git a/server-common/src/test/java/io/a2a/server/tasks/TaskUpdaterTest.java b/server-common/src/test/java/io/a2a/server/tasks/AgentEmitterTest.java similarity index 87% rename from server-common/src/test/java/io/a2a/server/tasks/TaskUpdaterTest.java rename to server-common/src/test/java/io/a2a/server/tasks/AgentEmitterTest.java index 73da17824..0a59012eb 100644 --- a/server-common/src/test/java/io/a2a/server/tasks/TaskUpdaterTest.java +++ b/server-common/src/test/java/io/a2a/server/tasks/AgentEmitterTest.java @@ -30,7 +30,7 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; -public class TaskUpdaterTest { +public class AgentEmitterTest { public static final String TEST_TASK_ID = "test-task-id"; public static final String TEST_TASK_CONTEXT_ID = "test-task-context-id"; @@ -48,7 +48,7 @@ public class TaskUpdaterTest { EventQueue eventQueue; private MainEventBus mainEventBus; private MainEventBusProcessor mainEventBusProcessor; - private TaskUpdater taskUpdater; + private AgentEmitter agentEmitter; @@ -69,7 +69,7 @@ public void init() { .setTaskId(TEST_TASK_ID) .setContextId(TEST_TASK_CONTEXT_ID) .build(); - taskUpdater = new TaskUpdater(context, eventQueue); + agentEmitter = new AgentEmitter(context, eventQueue); } @AfterEach @@ -81,7 +81,7 @@ public void cleanup() { @Test public void testAddArtifactWithCustomIdAndName() throws Exception { - taskUpdater.addArtifact(SAMPLE_PARTS, "custom-artifact-id", "Custom Artifact", null); + agentEmitter.addArtifact(SAMPLE_PARTS, "custom-artifact-id", "Custom Artifact", null); EventQueueItem item = eventQueue.dequeueEventItem(5000); assertNotNull(item); Event event = item.getEvent(); @@ -101,147 +101,147 @@ public void testAddArtifactWithCustomIdAndName() throws Exception { @Test public void testCompleteWithoutMessage() throws Exception { - taskUpdater.complete(); + agentEmitter.complete(); checkTaskStatusUpdateEventOnQueue(true, TaskState.COMPLETED, null); } @Test public void testCompleteWithMessage() throws Exception { - taskUpdater.complete(SAMPLE_MESSAGE); + agentEmitter.complete(SAMPLE_MESSAGE); checkTaskStatusUpdateEventOnQueue(true, TaskState.COMPLETED, SAMPLE_MESSAGE); } @Test public void testSubmitWithoutMessage() throws Exception { - taskUpdater.submit(); + agentEmitter.submit(); checkTaskStatusUpdateEventOnQueue(false, TaskState.SUBMITTED, null); } @Test public void testSubmitWithMessage() throws Exception { - taskUpdater.submit(SAMPLE_MESSAGE); + agentEmitter.submit(SAMPLE_MESSAGE); checkTaskStatusUpdateEventOnQueue(false, TaskState.SUBMITTED, SAMPLE_MESSAGE); } @Test public void testStartWorkWithoutMessage() throws Exception { - taskUpdater.startWork(); + agentEmitter.startWork(); checkTaskStatusUpdateEventOnQueue(false, TaskState.WORKING, null); } @Test public void testStartWorkWithMessage() throws Exception { - taskUpdater.startWork(SAMPLE_MESSAGE); + agentEmitter.startWork(SAMPLE_MESSAGE); checkTaskStatusUpdateEventOnQueue(false, TaskState.WORKING, SAMPLE_MESSAGE); } @Test public void testFailedWithoutMessage() throws Exception { - taskUpdater.fail(); + agentEmitter.fail(); checkTaskStatusUpdateEventOnQueue(true, TaskState.FAILED, null); } @Test public void testFailedWithMessage() throws Exception { - taskUpdater.fail(SAMPLE_MESSAGE); + agentEmitter.fail(SAMPLE_MESSAGE); checkTaskStatusUpdateEventOnQueue(true, TaskState.FAILED, SAMPLE_MESSAGE); } @Test public void testCanceledWithoutMessage() throws Exception { - taskUpdater.cancel(); + agentEmitter.cancel(); checkTaskStatusUpdateEventOnQueue(true, TaskState.CANCELED, null); } @Test public void testCanceledWithMessage() throws Exception { - taskUpdater.cancel(SAMPLE_MESSAGE); + agentEmitter.cancel(SAMPLE_MESSAGE); checkTaskStatusUpdateEventOnQueue(true, TaskState.CANCELED, SAMPLE_MESSAGE); } @Test public void testRejectWithoutMessage() throws Exception { - taskUpdater.reject(); + agentEmitter.reject(); checkTaskStatusUpdateEventOnQueue(true, TaskState.REJECTED, null); } @Test public void testRejectWithMessage() throws Exception { - taskUpdater.reject(SAMPLE_MESSAGE); + agentEmitter.reject(SAMPLE_MESSAGE); checkTaskStatusUpdateEventOnQueue(true, TaskState.REJECTED, SAMPLE_MESSAGE); } @Test public void testRequiresInputWithoutMessage() throws Exception { - taskUpdater.requiresInput(); + agentEmitter.requiresInput(); checkTaskStatusUpdateEventOnQueue(false, TaskState.INPUT_REQUIRED, null); } @Test public void testRequiresInputWithMessage() throws Exception { - taskUpdater.requiresInput(SAMPLE_MESSAGE); + agentEmitter.requiresInput(SAMPLE_MESSAGE); checkTaskStatusUpdateEventOnQueue(false, TaskState.INPUT_REQUIRED, SAMPLE_MESSAGE); } @Test public void testRequiresInputWithFinalTrue() throws Exception { - taskUpdater.requiresInput(true); + agentEmitter.requiresInput(true); checkTaskStatusUpdateEventOnQueue(true, TaskState.INPUT_REQUIRED, null); } @Test public void testRequiresInputWithMessageAndFinalTrue() throws Exception { - taskUpdater.requiresInput(SAMPLE_MESSAGE, true); + agentEmitter.requiresInput(SAMPLE_MESSAGE, true); checkTaskStatusUpdateEventOnQueue(true, TaskState.INPUT_REQUIRED, SAMPLE_MESSAGE); } @Test public void testRequiresAuthWithoutMessage() throws Exception { - taskUpdater.requiresAuth(); + agentEmitter.requiresAuth(); checkTaskStatusUpdateEventOnQueue(false, TaskState.AUTH_REQUIRED, null); } @Test public void testRequiresAuthWithMessage() throws Exception { - taskUpdater.requiresAuth(SAMPLE_MESSAGE); + agentEmitter.requiresAuth(SAMPLE_MESSAGE); checkTaskStatusUpdateEventOnQueue(false, TaskState.AUTH_REQUIRED, SAMPLE_MESSAGE); } @Test public void testRequiresAuthWithFinalTrue() throws Exception { - taskUpdater.requiresAuth(true); + agentEmitter.requiresAuth(true); checkTaskStatusUpdateEventOnQueue(true, TaskState.AUTH_REQUIRED, null); } @Test public void testRequiresAuthWithMessageAndFinalTrue() throws Exception { - taskUpdater.requiresAuth(SAMPLE_MESSAGE, true); + agentEmitter.requiresAuth(SAMPLE_MESSAGE, true); checkTaskStatusUpdateEventOnQueue(true, TaskState.AUTH_REQUIRED, SAMPLE_MESSAGE); } @Test public void testNonTerminalStateUpdatesAllowed() throws Exception { // Non-terminal states should be allowed multiple times - taskUpdater.submit(); + agentEmitter.submit(); checkTaskStatusUpdateEventOnQueue(false, TaskState.SUBMITTED, null); - taskUpdater.startWork(); + agentEmitter.startWork(); checkTaskStatusUpdateEventOnQueue(false, TaskState.WORKING, null); - taskUpdater.requiresInput(); + agentEmitter.requiresInput(); checkTaskStatusUpdateEventOnQueue(false, TaskState.INPUT_REQUIRED, null); - taskUpdater.requiresAuth(); + agentEmitter.requiresAuth(); checkTaskStatusUpdateEventOnQueue(false, TaskState.AUTH_REQUIRED, null); // Should still be able to complete - taskUpdater.complete(); + agentEmitter.complete(); checkTaskStatusUpdateEventOnQueue(true, TaskState.COMPLETED, null); } @Test public void testNewAgentMessage() throws Exception { - Message message = taskUpdater.newAgentMessage(SAMPLE_PARTS, null); + Message message = agentEmitter.newAgentMessage(SAMPLE_PARTS, null); assertEquals(AGENT, message.role()); assertEquals(TEST_TASK_ID, message.taskId()); @@ -254,7 +254,7 @@ public void testNewAgentMessage() throws Exception { @Test public void testNewAgentMessageWithMetadata() throws Exception { Map metadata = Map.of("key", "value"); - Message message = taskUpdater.newAgentMessage(SAMPLE_PARTS, metadata); + Message message = agentEmitter.newAgentMessage(SAMPLE_PARTS, metadata); assertEquals(AGENT, message.role()); assertEquals(TEST_TASK_ID, message.taskId()); @@ -266,7 +266,7 @@ public void testNewAgentMessageWithMetadata() throws Exception { @Test public void testAddArtifactWithAppendTrue() throws Exception { - taskUpdater.addArtifact(SAMPLE_PARTS, "artifact-id", "Test Artifact", null, true, null); + agentEmitter.addArtifact(SAMPLE_PARTS, "artifact-id", "Test Artifact", null, true, null); EventQueueItem item = eventQueue.dequeueEventItem(5000); assertNotNull(item); Event event = item.getEvent(); @@ -287,7 +287,7 @@ public void testAddArtifactWithAppendTrue() throws Exception { @Test public void testAddArtifactWithLastChunkTrue() throws Exception { - taskUpdater.addArtifact(SAMPLE_PARTS, "artifact-id", "Test Artifact", null, null, true); + agentEmitter.addArtifact(SAMPLE_PARTS, "artifact-id", "Test Artifact", null, null, true); EventQueueItem item = eventQueue.dequeueEventItem(5000); assertNotNull(item); Event event = item.getEvent(); @@ -304,7 +304,7 @@ public void testAddArtifactWithLastChunkTrue() throws Exception { @Test public void testAddArtifactWithAppendAndLastChunk() throws Exception { - taskUpdater.addArtifact(SAMPLE_PARTS, "artifact-id", "Test Artifact", null, true, false); + agentEmitter.addArtifact(SAMPLE_PARTS, "artifact-id", "Test Artifact", null, true, false); EventQueueItem item = eventQueue.dequeueEventItem(5000); assertNotNull(item); Event event = item.getEvent(); @@ -320,7 +320,7 @@ public void testAddArtifactWithAppendAndLastChunk() throws Exception { @Test public void testAddArtifactGeneratesIdWhenNull() throws Exception { - taskUpdater.addArtifact(SAMPLE_PARTS, null, "Test Artifact", null); + agentEmitter.addArtifact(SAMPLE_PARTS, null, "Test Artifact", null); EventQueueItem item = eventQueue.dequeueEventItem(5000); assertNotNull(item); Event event = item.getEvent(); @@ -340,11 +340,11 @@ public void testAddArtifactGeneratesIdWhenNull() throws Exception { @Test public void testTerminalStateProtectionAfterComplete() throws Exception { // Complete the task first - taskUpdater.complete(); + agentEmitter.complete(); checkTaskStatusUpdateEventOnQueue(true, TaskState.COMPLETED, null); // Try to update status again - should throw RuntimeException - RuntimeException exception = assertThrows(RuntimeException.class, () -> taskUpdater.startWork()); + RuntimeException exception = assertThrows(RuntimeException.class, () -> agentEmitter.startWork()); assertEquals("Cannot update task status - terminal state already reached", exception.getMessage()); // Verify no additional events were queued @@ -354,11 +354,11 @@ public void testTerminalStateProtectionAfterComplete() throws Exception { @Test public void testTerminalStateProtectionAfterFail() throws Exception { // Fail the task first - taskUpdater.fail(); + agentEmitter.fail(); checkTaskStatusUpdateEventOnQueue(true, TaskState.FAILED, null); // Try to update status again - should throw RuntimeException - RuntimeException exception = assertThrows(RuntimeException.class, () -> taskUpdater.complete()); + RuntimeException exception = assertThrows(RuntimeException.class, () -> agentEmitter.complete()); assertEquals("Cannot update task status - terminal state already reached", exception.getMessage()); // Verify no additional events were queued @@ -368,11 +368,11 @@ public void testTerminalStateProtectionAfterFail() throws Exception { @Test public void testTerminalStateProtectionAfterReject() throws Exception { // Reject the task first - taskUpdater.reject(); + agentEmitter.reject(); checkTaskStatusUpdateEventOnQueue(true, TaskState.REJECTED, null); // Try to update status again - should throw RuntimeException - RuntimeException exception = assertThrows(RuntimeException.class, () -> taskUpdater.startWork()); + RuntimeException exception = assertThrows(RuntimeException.class, () -> agentEmitter.startWork()); assertEquals("Cannot update task status - terminal state already reached", exception.getMessage()); // Verify no additional events were queued @@ -382,11 +382,11 @@ public void testTerminalStateProtectionAfterReject() throws Exception { @Test public void testTerminalStateProtectionAfterCancel() throws Exception { // Cancel the task first - taskUpdater.cancel(); + agentEmitter.cancel(); checkTaskStatusUpdateEventOnQueue(true, TaskState.CANCELED, null); // Try to update status again - should throw RuntimeException - RuntimeException exception = assertThrows(RuntimeException.class, () -> taskUpdater.submit()); + RuntimeException exception = assertThrows(RuntimeException.class, () -> agentEmitter.submit()); assertEquals("Cannot update task status - terminal state already reached", exception.getMessage()); // Verify no additional events were queued @@ -398,7 +398,7 @@ public void testConcurrentCompletionAttempts() throws Exception { // This test simulates race condition between multiple completion attempts Thread thread1 = new Thread(() -> { try { - taskUpdater.complete(); + agentEmitter.complete(); } catch (RuntimeException e) { // Expected for one of the threads } @@ -406,7 +406,7 @@ public void testConcurrentCompletionAttempts() throws Exception { Thread thread2 = new Thread(() -> { try { - taskUpdater.fail(); + agentEmitter.fail(); } catch (RuntimeException e) { // Expected for one of the threads } diff --git a/tck/src/main/java/io/a2a/tck/server/AgentExecutorProducer.java b/tck/src/main/java/io/a2a/tck/server/AgentExecutorProducer.java index d1f5af5a6..5a14e3dc7 100644 --- a/tck/src/main/java/io/a2a/tck/server/AgentExecutorProducer.java +++ b/tck/src/main/java/io/a2a/tck/server/AgentExecutorProducer.java @@ -1,20 +1,20 @@ package io.a2a.tck.server; +import java.util.List; + import jakarta.annotation.PreDestroy; import jakarta.enterprise.context.ApplicationScoped; import jakarta.enterprise.inject.Produces; import io.a2a.server.agentexecution.AgentExecutor; import io.a2a.server.agentexecution.RequestContext; -import io.a2a.server.events.EventQueue; -import io.a2a.server.tasks.TaskUpdater; +import io.a2a.server.tasks.AgentEmitter; import io.a2a.spec.A2AError; import io.a2a.spec.Task; import io.a2a.spec.TaskNotCancelableError; import io.a2a.spec.TaskState; import io.a2a.spec.TaskStatus; import io.a2a.spec.TaskStatusUpdateEvent; -import java.util.List; @ApplicationScoped public class AgentExecutorProducer { @@ -27,7 +27,7 @@ public AgentExecutor agentExecutor() { private static class FireAndForgetAgentExecutor implements AgentExecutor { @Override - public void execute(RequestContext context, EventQueue eventQueue) throws A2AError { + public void execute(RequestContext context, AgentEmitter agentEmitter) throws A2AError { Task task = context.getTask(); if (task == null) { @@ -46,7 +46,7 @@ public void execute(RequestContext context, EventQueue eventQueue) throws A2AErr .status(new TaskStatus(TaskState.SUBMITTED)) .history(List.of(context.getMessage())) .build(); - eventQueue.enqueueEvent(task); + agentEmitter.addTask(task); } // Sleep to allow task state persistence before TCK resubscribe test @@ -59,10 +59,9 @@ public void execute(RequestContext context, EventQueue eventQueue) throws A2AErr Thread.currentThread().interrupt(); } } - TaskUpdater updater = new TaskUpdater(context, eventQueue); // Immediately set to WORKING state - updater.startWork(); + agentEmitter.startWork(); System.out.println("====> task set to WORKING, starting background execution"); // Method returns immediately - task continues in background @@ -70,7 +69,7 @@ public void execute(RequestContext context, EventQueue eventQueue) throws A2AErr } @Override - public void cancel(RequestContext context, EventQueue eventQueue) throws A2AError { + public void cancel(RequestContext context, AgentEmitter agentEmitter) throws A2AError { System.out.println("====> task cancel request received"); Task task = context.getTask(); if (task == null) { @@ -87,14 +86,7 @@ public void cancel(RequestContext context, EventQueue eventQueue) throws A2AErro throw new TaskNotCancelableError(); } - TaskUpdater updater = new TaskUpdater(context, eventQueue); - updater.cancel(); - eventQueue.enqueueEvent(TaskStatusUpdateEvent.builder() - .taskId(task.id()) - .contextId(task.contextId()) - .status(new TaskStatus(TaskState.CANCELED)) - .isFinal(true) - .build()); + agentEmitter.cancel(); System.out.println("====> task canceled"); } diff --git a/tests/server-common/src/test/java/io/a2a/server/apps/common/AgentExecutorProducer.java b/tests/server-common/src/test/java/io/a2a/server/apps/common/AgentExecutorProducer.java index 364d2275f..07e8191f8 100644 --- a/tests/server-common/src/test/java/io/a2a/server/apps/common/AgentExecutorProducer.java +++ b/tests/server-common/src/test/java/io/a2a/server/apps/common/AgentExecutorProducer.java @@ -7,8 +7,7 @@ import io.a2a.server.agentexecution.AgentExecutor; import io.a2a.server.agentexecution.RequestContext; -import io.a2a.server.events.EventQueue; -import io.a2a.server.tasks.TaskUpdater; +import io.a2a.server.tasks.AgentEmitter; import io.a2a.spec.A2AError; import io.a2a.spec.InvalidParamsError; import io.a2a.spec.Message; @@ -25,23 +24,22 @@ public class AgentExecutorProducer { public AgentExecutor agentExecutor() { return new AgentExecutor() { @Override - public void execute(RequestContext context, EventQueue eventQueue) throws A2AError { - TaskUpdater updater = new TaskUpdater(context, eventQueue); + public void execute(RequestContext context, AgentEmitter agentEmitter) throws A2AError { String taskId = context.getTaskId(); // Special handling for multi-event test if (taskId != null && taskId.startsWith("multi-event-test")) { // First call: context.getTask() == null (new task) if (context.getTask() == null) { - updater.startWork(); + agentEmitter.startWork(); // Return immediately - queue stays open because task is in WORKING state return; } else { // Second call: context.getTask() != null (existing task) - updater.addArtifact( + agentEmitter.addArtifact( List.of(new TextPart("Second message artifact")), "artifact-2", "Second Artifact", null); - updater.complete(); + agentEmitter.complete(); return; } } @@ -50,8 +48,8 @@ public void execute(RequestContext context, EventQueue eventQueue) throws A2AErr if (taskId != null && taskId.startsWith("input-required-test")) { // First call: context.getTask() == null (new task) if (context.getTask() == null) { - updater.startWork(); - updater.requiresInput(updater.newAgentMessage( + agentEmitter.startWork(); + agentEmitter.requiresInput(agentEmitter.newAgentMessage( List.of(new TextPart("Please provide additional information")), context.getMessage().metadata())); // Return immediately - queue stays open because task is in INPUT_REQUIRED state @@ -62,23 +60,26 @@ public void execute(RequestContext context, EventQueue eventQueue) throws A2AErr throw new InvalidParamsError("We didn't get the expected input"); } // Second call: context.getTask() != null (input provided) - updater.startWork(); - updater.complete(); + agentEmitter.startWork(); + agentEmitter.complete(); return; } } if (context.getTaskId().equals("task-not-supported-123")) { - eventQueue.enqueueEvent(new UnsupportedOperationError()); + agentEmitter.fail(new UnsupportedOperationError()); + } + if (context.getMessage() != null) { + agentEmitter.sendMessage(context.getMessage()); + } else { + agentEmitter.addTask(context.getTask()); } - eventQueue.enqueueEvent(context.getMessage() != null ? context.getMessage() : context.getTask()); } @Override - public void cancel(RequestContext context, EventQueue eventQueue) throws A2AError { + public void cancel(RequestContext context, AgentEmitter agentEmitter) throws A2AError { if (context.getTask().id().equals("cancel-task-123")) { - TaskUpdater taskUpdater = new TaskUpdater(context, eventQueue); - taskUpdater.cancel(); + agentEmitter.cancel(); } else if (context.getTask().id().equals("cancel-task-not-supported-123")) { throw new UnsupportedOperationError(); } diff --git a/transport/grpc/src/test/java/io/a2a/transport/grpc/handler/GrpcHandlerTest.java b/transport/grpc/src/test/java/io/a2a/transport/grpc/handler/GrpcHandlerTest.java index afed1329f..67428d868 100644 --- a/transport/grpc/src/test/java/io/a2a/transport/grpc/handler/GrpcHandlerTest.java +++ b/transport/grpc/src/test/java/io/a2a/transport/grpc/handler/GrpcHandlerTest.java @@ -12,8 +12,6 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; -import jakarta.enterprise.inject.Instance; - import com.google.protobuf.Empty; import com.google.protobuf.Struct; import io.a2a.grpc.AuthenticationInfo; @@ -44,7 +42,7 @@ import io.a2a.server.requesthandlers.AbstractA2ARequestHandlerTest; import io.a2a.server.requesthandlers.DefaultRequestHandler; import io.a2a.server.requesthandlers.RequestHandler; -import io.a2a.server.tasks.TaskUpdater; +import io.a2a.server.tasks.AgentEmitter; import io.a2a.spec.AgentCapabilities; import io.a2a.spec.AgentCard; import io.a2a.spec.AgentExtension; @@ -57,7 +55,6 @@ import io.a2a.spec.TaskStatusUpdateEvent; import io.a2a.spec.TextPart; import io.a2a.spec.UnsupportedOperationError; - import io.grpc.Status; import io.grpc.StatusRuntimeException; import io.grpc.internal.testing.StreamRecorder; @@ -122,13 +119,12 @@ public void testOnCancelTaskSuccess() throws Exception { GrpcHandler handler = new TestGrpcHandler(AbstractA2ARequestHandlerTest.CARD, requestHandler, internalExecutor); taskStore.save(AbstractA2ARequestHandlerTest.MINIMAL_TASK, false); - agentExecutorCancel = (context, eventQueue) -> { + agentExecutorCancel = (context, agentEmitter) -> { // We need to cancel the task or the EventConsumer never finds a 'final' event. // Looking at the Python implementation, they typically use AgentExecutors that // don't support cancellation. So my theory is the Agent updates the task to the CANCEL status io.a2a.spec.Task task = context.getTask(); - TaskUpdater taskUpdater = new TaskUpdater(context, eventQueue); - taskUpdater.cancel(); + agentEmitter.cancel(); }; CancelTaskRequest request = CancelTaskRequest.newBuilder() @@ -153,7 +149,7 @@ public void testOnCancelTaskNotSupported() throws Exception { GrpcHandler handler = new TestGrpcHandler(AbstractA2ARequestHandlerTest.CARD, requestHandler, internalExecutor); taskStore.save(AbstractA2ARequestHandlerTest.MINIMAL_TASK, false); - agentExecutorCancel = (context, eventQueue) -> { + agentExecutorCancel = (context, agentEmitter) -> { throw new UnsupportedOperationError(); }; @@ -183,8 +179,8 @@ public void testOnCancelTaskNotFound() throws Exception { @Test public void testOnMessageNewMessageSuccess() throws Exception { GrpcHandler handler = new TestGrpcHandler(AbstractA2ARequestHandlerTest.CARD, requestHandler, internalExecutor); - agentExecutorExecute = (context, eventQueue) -> { - eventQueue.enqueueEvent(context.getMessage()); + agentExecutorExecute = (context, agentEmitter) -> { + agentEmitter.sendMessage(context.getMessage()); }; StreamRecorder streamRecorder = sendMessageRequest(handler); @@ -200,8 +196,8 @@ public void testOnMessageNewMessageSuccess() throws Exception { public void testOnMessageNewMessageWithExistingTaskSuccess() throws Exception { GrpcHandler handler = new TestGrpcHandler(AbstractA2ARequestHandlerTest.CARD, requestHandler, internalExecutor); taskStore.save(AbstractA2ARequestHandlerTest.MINIMAL_TASK, false); - agentExecutorExecute = (context, eventQueue) -> { - eventQueue.enqueueEvent(context.getMessage()); + agentExecutorExecute = (context, agentEmitter) -> { + agentEmitter.sendMessage(context.getMessage()); }; StreamRecorder streamRecorder = sendMessageRequest(handler); Assertions.assertNull(streamRecorder.getError()); @@ -215,8 +211,8 @@ public void testOnMessageNewMessageWithExistingTaskSuccess() throws Exception { @Test public void testOnMessageError() throws Exception { GrpcHandler handler = new TestGrpcHandler(AbstractA2ARequestHandlerTest.CARD, requestHandler, internalExecutor); - agentExecutorExecute = (context, eventQueue) -> { - eventQueue.enqueueEvent(new UnsupportedOperationError()); + agentExecutorExecute = (context, agentEmitter) -> { + agentEmitter.fail(new UnsupportedOperationError()); }; StreamRecorder streamRecorder = sendMessageRequest(handler); assertGrpcError(streamRecorder, Status.Code.UNIMPLEMENTED); @@ -244,8 +240,8 @@ public void testSetPushNotificationConfigSuccess() throws Exception { @Test public void testGetPushNotificationConfigSuccess() throws Exception { GrpcHandler handler = new TestGrpcHandler(AbstractA2ARequestHandlerTest.CARD, requestHandler, internalExecutor); - agentExecutorExecute = (context, eventQueue) -> { - eventQueue.enqueueEvent(context.getTask() != null ? context.getTask() : context.getMessage()); + agentExecutorExecute = (context, agentEmitter) -> { + agentEmitter.emitEvent(context.getTask() != null ? context.getTask() : context.getMessage()); }; String NAME = "tasks/" + AbstractA2ARequestHandlerTest.MINIMAL_TASK.id() + "/pushNotificationConfigs/" + "config456"; @@ -303,8 +299,8 @@ public void testOnSetPushNotificationNoPushNotifierConfig() throws Exception { @Test public void testOnMessageStreamNewMessageSuccess() throws Exception { GrpcHandler handler = new TestGrpcHandler(AbstractA2ARequestHandlerTest.CARD, requestHandler, internalExecutor); - agentExecutorExecute = (context, eventQueue) -> { - eventQueue.enqueueEvent(context.getTask() != null ? context.getTask() : context.getMessage()); + agentExecutorExecute = (context, agentEmitter) -> { + agentEmitter.emitEvent(context.getTask() != null ? context.getTask() : context.getMessage()); }; StreamRecorder streamRecorder = sendStreamingMessageRequest(handler); @@ -321,8 +317,8 @@ public void testOnMessageStreamNewMessageSuccess() throws Exception { @Test public void testOnMessageStreamNewMessageExistingTaskSuccess() throws Exception { GrpcHandler handler = new TestGrpcHandler(AbstractA2ARequestHandlerTest.CARD, requestHandler, internalExecutor); - agentExecutorExecute = (context, eventQueue) -> { - eventQueue.enqueueEvent(context.getTask() != null ? context.getTask() : context.getMessage()); + agentExecutorExecute = (context, agentEmitter) -> { + agentEmitter.emitEvent(context.getTask() != null ? context.getTask() : context.getMessage()); }; io.a2a.spec.Task task = io.a2a.spec.Task.builder(AbstractA2ARequestHandlerTest.MINIMAL_TASK) @@ -445,10 +441,10 @@ public void testOnMessageStreamNewMessageSendPushNotificationSuccess() throws Ex .build()); - agentExecutorExecute = (context, eventQueue) -> { + agentExecutorExecute = (context, agentEmitter) -> { // Hardcode the events to send here for (Event event : events) { - eventQueue.enqueueEvent(event); + agentEmitter.emitEvent(event); } }; @@ -526,8 +522,8 @@ public void testOnResubscribeExistingTaskSuccess() throws Exception { taskStore.save(AbstractA2ARequestHandlerTest.MINIMAL_TASK, false); queueManager.createOrTap(AbstractA2ARequestHandlerTest.MINIMAL_TASK.id()); - agentExecutorExecute = (context, eventQueue) -> { - eventQueue.enqueueEvent(context.getMessage()); + agentExecutorExecute = (context, agentEmitter) -> { + agentEmitter.sendMessage(context.getMessage()); }; StreamRecorder streamRecorder = StreamRecorder.create(); @@ -643,8 +639,8 @@ public void testOnMessageStreamInternalError() throws Exception { public void testListPushNotificationConfig() throws Exception { GrpcHandler handler = new TestGrpcHandler(AbstractA2ARequestHandlerTest.CARD, requestHandler, internalExecutor); taskStore.save(AbstractA2ARequestHandlerTest.MINIMAL_TASK, false); - agentExecutorExecute = (context, eventQueue) -> { - eventQueue.enqueueEvent(context.getTask() != null ? context.getTask() : context.getMessage()); + agentExecutorExecute = (context, agentEmitter) -> { + agentEmitter.emitEvent(context.getTask() != null ? context.getTask() : context.getMessage()); }; String NAME = "tasks/" + AbstractA2ARequestHandlerTest.MINIMAL_TASK.id() + "/pushNotificationConfigs/" + AbstractA2ARequestHandlerTest.MINIMAL_TASK.id(); @@ -669,8 +665,8 @@ public void testListPushNotificationConfigNotSupported() throws Exception { AgentCard card = AbstractA2ARequestHandlerTest.createAgentCard(true, false, true); GrpcHandler handler = new TestGrpcHandler(card, requestHandler, internalExecutor); taskStore.save(AbstractA2ARequestHandlerTest.MINIMAL_TASK, false); - agentExecutorExecute = (context, eventQueue) -> { - eventQueue.enqueueEvent(context.getTask() != null ? context.getTask() : context.getMessage()); + agentExecutorExecute = (context, agentEmitter) -> { + agentEmitter.emitEvent(context.getTask() != null ? context.getTask() : context.getMessage()); }; ListTaskPushNotificationConfigRequest request = ListTaskPushNotificationConfigRequest.newBuilder() @@ -686,8 +682,8 @@ public void testListPushNotificationConfigNoPushConfigStore() { DefaultRequestHandler requestHandler = DefaultRequestHandler.create(executor, taskStore, queueManager, null, mainEventBusProcessor, internalExecutor, internalExecutor); GrpcHandler handler = new TestGrpcHandler(AbstractA2ARequestHandlerTest.CARD, requestHandler, internalExecutor); taskStore.save(AbstractA2ARequestHandlerTest.MINIMAL_TASK, false); - agentExecutorExecute = (context, eventQueue) -> { - eventQueue.enqueueEvent(context.getTask() != null ? context.getTask() : context.getMessage()); + agentExecutorExecute = (context, agentEmitter) -> { + agentEmitter.emitEvent(context.getTask() != null ? context.getTask() : context.getMessage()); }; ListTaskPushNotificationConfigRequest request = ListTaskPushNotificationConfigRequest.newBuilder() @@ -701,8 +697,8 @@ public void testListPushNotificationConfigNoPushConfigStore() { @Test public void testListPushNotificationConfigTaskNotFound() { GrpcHandler handler = new TestGrpcHandler(AbstractA2ARequestHandlerTest.CARD, requestHandler, internalExecutor); - agentExecutorExecute = (context, eventQueue) -> { - eventQueue.enqueueEvent(context.getTask() != null ? context.getTask() : context.getMessage()); + agentExecutorExecute = (context, agentEmitter) -> { + agentEmitter.emitEvent(context.getTask() != null ? context.getTask() : context.getMessage()); }; ListTaskPushNotificationConfigRequest request = ListTaskPushNotificationConfigRequest.newBuilder() @@ -717,8 +713,8 @@ public void testListPushNotificationConfigTaskNotFound() { public void testDeletePushNotificationConfig() throws Exception { GrpcHandler handler = new TestGrpcHandler(AbstractA2ARequestHandlerTest.CARD, requestHandler, internalExecutor); taskStore.save(AbstractA2ARequestHandlerTest.MINIMAL_TASK, false); - agentExecutorExecute = (context, eventQueue) -> { - eventQueue.enqueueEvent(context.getTask() != null ? context.getTask() : context.getMessage()); + agentExecutorExecute = (context, agentEmitter) -> { + agentEmitter.emitEvent(context.getTask() != null ? context.getTask() : context.getMessage()); }; String NAME = "tasks/" + AbstractA2ARequestHandlerTest.MINIMAL_TASK.id() + "/pushNotificationConfigs/" + AbstractA2ARequestHandlerTest.MINIMAL_TASK.id(); @@ -740,8 +736,8 @@ public void testDeletePushNotificationConfigNotSupported() throws Exception { AgentCard card = AbstractA2ARequestHandlerTest.createAgentCard(true, false, true); GrpcHandler handler = new TestGrpcHandler(card, requestHandler, internalExecutor); taskStore.save(AbstractA2ARequestHandlerTest.MINIMAL_TASK, false); - agentExecutorExecute = (context, eventQueue) -> { - eventQueue.enqueueEvent(context.getTask() != null ? context.getTask() : context.getMessage()); + agentExecutorExecute = (context, agentEmitter) -> { + agentEmitter.emitEvent(context.getTask() != null ? context.getTask() : context.getMessage()); }; String NAME = "tasks/" + AbstractA2ARequestHandlerTest.MINIMAL_TASK.id() + "/pushNotificationConfigs/" + AbstractA2ARequestHandlerTest.MINIMAL_TASK.id(); @@ -781,14 +777,14 @@ public void testStreamingDoesNotBlockMainThread() throws Exception { GrpcHandler.setStreamingSubscribedRunnable(streamStarted::countDown); CountDownLatch eventProcessed = new CountDownLatch(1); - agentExecutorExecute = (context, eventQueue) -> { + agentExecutorExecute = (context, agentEmitter) -> { // Wait a bit to ensure the main thread continues try { Thread.sleep(100); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } - eventQueue.enqueueEvent(context.getMessage()); + agentEmitter.sendMessage(context.getMessage()); }; // Start streaming with a custom StreamObserver @@ -949,8 +945,8 @@ public ServerCallContext create(StreamObserver streamObserver) { } }; - agentExecutorExecute = (context, eventQueue) -> { - eventQueue.enqueueEvent(context.getMessage()); + agentExecutorExecute = (context, agentEmitter) -> { + agentEmitter.sendMessage(context.getMessage()); }; SendMessageRequest request = SendMessageRequest.newBuilder() @@ -1090,8 +1086,8 @@ public ServerCallContext create(StreamObserver streamObserver) { } }; - agentExecutorExecute = (context, eventQueue) -> { - eventQueue.enqueueEvent(context.getMessage()); + agentExecutorExecute = (context, agentEmitter) -> { + agentEmitter.sendMessage(context.getMessage()); }; SendMessageRequest request = SendMessageRequest.newBuilder() @@ -1141,8 +1137,8 @@ public ServerCallContext create(StreamObserver streamObserver) { } }; - agentExecutorExecute = (context, eventQueue) -> { - eventQueue.enqueueEvent(context.getMessage()); + agentExecutorExecute = (context, agentEmitter) -> { + agentEmitter.sendMessage(context.getMessage()); }; SendMessageRequest request = SendMessageRequest.newBuilder() diff --git a/transport/jsonrpc/src/test/java/io/a2a/transport/jsonrpc/handler/JSONRPCHandlerTest.java b/transport/jsonrpc/src/test/java/io/a2a/transport/jsonrpc/handler/JSONRPCHandlerTest.java index 4dc151626..79a227431 100644 --- a/transport/jsonrpc/src/test/java/io/a2a/transport/jsonrpc/handler/JSONRPCHandlerTest.java +++ b/transport/jsonrpc/src/test/java/io/a2a/transport/jsonrpc/handler/JSONRPCHandlerTest.java @@ -45,11 +45,10 @@ import io.a2a.server.ServerCallContext; import io.a2a.server.auth.UnauthenticatedUser; import io.a2a.server.events.EventConsumer; -import io.a2a.server.events.MainEventBusProcessorCallback; import io.a2a.server.requesthandlers.AbstractA2ARequestHandlerTest; import io.a2a.server.requesthandlers.DefaultRequestHandler; +import io.a2a.server.tasks.AgentEmitter; import io.a2a.server.tasks.ResultAggregator; -import io.a2a.server.tasks.TaskUpdater; import io.a2a.spec.AgentCapabilities; import io.a2a.spec.AgentCard; import io.a2a.spec.AgentExtension; @@ -118,13 +117,12 @@ public void testOnCancelTaskSuccess() throws Exception { JSONRPCHandler handler = new JSONRPCHandler(CARD, requestHandler, internalExecutor); taskStore.save(MINIMAL_TASK, false); - agentExecutorCancel = (context, eventQueue) -> { + agentExecutorCancel = (context, agentEmitter) -> { // We need to cancel the task or the EventConsumer never finds a 'final' event. // Looking at the Python implementation, they typically use AgentExecutors that // don't support cancellation. So my theory is the Agent updates the task to the CANCEL status Task task = context.getTask(); - TaskUpdater taskUpdater = new TaskUpdater(context, eventQueue); - taskUpdater.cancel(); + agentEmitter.cancel(); }; CancelTaskRequest request = new CancelTaskRequest("111", new TaskIdParams(MINIMAL_TASK.id())); @@ -143,7 +141,7 @@ public void testOnCancelTaskNotSupported() { JSONRPCHandler handler = new JSONRPCHandler(CARD, requestHandler, internalExecutor); taskStore.save(MINIMAL_TASK, false); - agentExecutorCancel = (context, eventQueue) -> { + agentExecutorCancel = (context, agentEmitter) -> { throw new UnsupportedOperationError(); }; @@ -167,8 +165,8 @@ public void testOnCancelTaskNotFound() { @Test public void testOnMessageNewMessageSuccess() { JSONRPCHandler handler = new JSONRPCHandler(CARD, requestHandler, internalExecutor); - agentExecutorExecute = (context, eventQueue) -> { - eventQueue.enqueueEvent(context.getMessage()); + agentExecutorExecute = (context, agentEmitter) -> { + agentEmitter.sendMessage(context.getMessage()); }; Message message = Message.builder(MESSAGE) .taskId(MINIMAL_TASK.id()) @@ -184,8 +182,8 @@ public void testOnMessageNewMessageSuccess() { public void testOnMessageNewMessageWithExistingTaskSuccess() { JSONRPCHandler handler = new JSONRPCHandler(CARD, requestHandler, internalExecutor); taskStore.save(MINIMAL_TASK, false); - agentExecutorExecute = (context, eventQueue) -> { - eventQueue.enqueueEvent(context.getMessage()); + agentExecutorExecute = (context, agentEmitter) -> { + agentEmitter.sendMessage(context.getMessage()); }; Message message = Message.builder(MESSAGE) .taskId(MINIMAL_TASK.id()) @@ -202,8 +200,8 @@ public void testOnMessageError() { // See testMessageOnErrorMocks() for a test more similar to the Python implementation, using mocks for // EventConsumer.consumeAll() JSONRPCHandler handler = new JSONRPCHandler(CARD, requestHandler, internalExecutor); - agentExecutorExecute = (context, eventQueue) -> { - eventQueue.enqueueEvent(new UnsupportedOperationError()); + agentExecutorExecute = (context, agentEmitter) -> { + agentEmitter.fail(new UnsupportedOperationError()); }; Message message = Message.builder(MESSAGE) .taskId(MINIMAL_TASK.id()) @@ -241,8 +239,8 @@ public void testOnMessageErrorMocks() { @Test public void testOnMessageStreamNewMessageSuccess() throws InterruptedException { JSONRPCHandler handler = new JSONRPCHandler(CARD, requestHandler, internalExecutor); - agentExecutorExecute = (context, eventQueue) -> { - eventQueue.enqueueEvent(context.getTask() != null ? context.getTask() : context.getMessage()); + agentExecutorExecute = (context, agentEmitter) -> { + agentEmitter.emitEvent(context.getTask() != null ? context.getTask() : context.getMessage()); }; Message message = Message.builder(MESSAGE) @@ -322,13 +320,13 @@ public void testOnMessageStreamNewMessageMultipleEventsSuccess() throws Interrup .build(); // Configure the agent executor to enqueue multiple events - agentExecutorExecute = (context, eventQueue) -> { + agentExecutorExecute = (context, agentEmitter) -> { // Enqueue the task with WORKING state - eventQueue.enqueueEvent(taskEvent); + agentEmitter.emitEvent(taskEvent); // Enqueue an artifact update event - eventQueue.enqueueEvent(artifactEvent); + agentEmitter.emitEvent(artifactEvent); // Enqueue a status update event to complete the task (this is the "final" event) - eventQueue.enqueueEvent(statusEvent); + agentEmitter.emitEvent(statusEvent); }; Message message = Message.builder(MESSAGE) @@ -489,8 +487,8 @@ public void onComplete() { @Test public void testOnMessageStreamNewMessageExistingTaskSuccess() throws Exception { JSONRPCHandler handler = new JSONRPCHandler(CARD, requestHandler, internalExecutor); - agentExecutorExecute = (context, eventQueue) -> { - eventQueue.enqueueEvent(context.getTask() != null ? context.getTask() : context.getMessage()); + agentExecutorExecute = (context, agentEmitter) -> { + agentEmitter.emitEvent(context.getTask() != null ? context.getTask() : context.getMessage()); }; Task task = Task.builder(MINIMAL_TASK) @@ -663,8 +661,8 @@ public void testSetPushNotificationConfigSuccess() { public void testGetPushNotificationConfigSuccess() { JSONRPCHandler handler = new JSONRPCHandler(CARD, requestHandler, internalExecutor); taskStore.save(MINIMAL_TASK, false); - agentExecutorExecute = (context, eventQueue) -> { - eventQueue.enqueueEvent(context.getTask() != null ? context.getTask() : context.getMessage()); + agentExecutorExecute = (context, agentEmitter) -> { + agentEmitter.emitEvent(context.getTask() != null ? context.getTask() : context.getMessage()); }; TaskPushNotificationConfig taskPushConfig @@ -714,10 +712,10 @@ public void testOnMessageStreamNewMessageSendPushNotificationSuccess() throws Ex .build()); - agentExecutorExecute = (context, eventQueue) -> { + agentExecutorExecute = (context, agentEmitter) -> { // Hardcode the events to send here for (Event event : events) { - eventQueue.enqueueEvent(event); + agentEmitter.emitEvent(event); } }; @@ -807,10 +805,10 @@ public void testOnResubscribeExistingTaskSuccess() { taskStore.save(MINIMAL_TASK, false); queueManager.createOrTap(MINIMAL_TASK.id()); - agentExecutorExecute = (context, eventQueue) -> { + agentExecutorExecute = (context, agentEmitter) -> { // The only thing hitting the agent is the onMessageSend() and we should use the message - eventQueue.enqueueEvent(context.getMessage()); - //eventQueue.enqueueEvent(context.getTask() != null ? context.getTask() : context.getMessage()); + agentEmitter.sendMessage(context.getMessage()); + //agentEmitter.emitEvent(context.getTask() != null ? context.getTask() : context.getMessage()); }; SubscribeToTaskRequest request = new SubscribeToTaskRequest("1", new TaskIdParams(MINIMAL_TASK.id())); @@ -1262,8 +1260,8 @@ public void testOnMessageSendTaskIdMismatch() { JSONRPCHandler handler = new JSONRPCHandler(CARD, requestHandler, internalExecutor); taskStore.save(MINIMAL_TASK, false); - agentExecutorExecute = ((context, eventQueue) -> { - eventQueue.enqueueEvent(MINIMAL_TASK); + agentExecutorExecute = ((context, agentEmitter) -> { + agentEmitter.emitEvent(MINIMAL_TASK); }); SendMessageRequest request = new SendMessageRequest("1", new MessageSendParams(MESSAGE, null, null)); @@ -1278,8 +1276,8 @@ public void testOnMessageStreamTaskIdMismatch() throws InterruptedException { JSONRPCHandler handler = new JSONRPCHandler(CARD, requestHandler, internalExecutor); taskStore.save(MINIMAL_TASK, false); - agentExecutorExecute = ((context, eventQueue) -> { - eventQueue.enqueueEvent(MINIMAL_TASK); + agentExecutorExecute = ((context, agentEmitter) -> { + agentEmitter.emitEvent(MINIMAL_TASK); }); SendStreamingMessageRequest request = new SendStreamingMessageRequest("1", new MessageSendParams(MESSAGE, null, null)); @@ -1329,8 +1327,8 @@ public void onComplete() { public void testListPushNotificationConfig() { JSONRPCHandler handler = new JSONRPCHandler(CARD, requestHandler, internalExecutor); taskStore.save(MINIMAL_TASK, false); - agentExecutorExecute = (context, eventQueue) -> { - eventQueue.enqueueEvent(context.getTask() != null ? context.getTask() : context.getMessage()); + agentExecutorExecute = (context, agentEmitter) -> { + agentEmitter.emitEvent(context.getTask() != null ? context.getTask() : context.getMessage()); }; TaskPushNotificationConfig taskPushConfig @@ -1360,8 +1358,8 @@ public void testListPushNotificationConfigNotSupported() { AgentCard card = createAgentCard(true, false, true); JSONRPCHandler handler = new JSONRPCHandler(card, requestHandler, internalExecutor); taskStore.save(MINIMAL_TASK, false); - agentExecutorExecute = (context, eventQueue) -> { - eventQueue.enqueueEvent(context.getTask() != null ? context.getTask() : context.getMessage()); + agentExecutorExecute = (context, agentEmitter) -> { + agentEmitter.emitEvent(context.getTask() != null ? context.getTask() : context.getMessage()); }; TaskPushNotificationConfig taskPushConfig @@ -1388,8 +1386,8 @@ public void testListPushNotificationConfigNoPushConfigStore() { DefaultRequestHandler requestHandler = DefaultRequestHandler.create(executor, taskStore, queueManager, null, mainEventBusProcessor, internalExecutor, internalExecutor); JSONRPCHandler handler = new JSONRPCHandler(CARD, requestHandler, internalExecutor); taskStore.save(MINIMAL_TASK, false); - agentExecutorExecute = (context, eventQueue) -> { - eventQueue.enqueueEvent(context.getTask() != null ? context.getTask() : context.getMessage()); + agentExecutorExecute = (context, agentEmitter) -> { + agentEmitter.emitEvent(context.getTask() != null ? context.getTask() : context.getMessage()); }; ListTaskPushNotificationConfigRequest listRequest @@ -1405,8 +1403,8 @@ public void testListPushNotificationConfigNoPushConfigStore() { @Test public void testListPushNotificationConfigTaskNotFound() { JSONRPCHandler handler = new JSONRPCHandler(CARD, requestHandler, internalExecutor); - agentExecutorExecute = (context, eventQueue) -> { - eventQueue.enqueueEvent(context.getTask() != null ? context.getTask() : context.getMessage()); + agentExecutorExecute = (context, agentEmitter) -> { + agentEmitter.emitEvent(context.getTask() != null ? context.getTask() : context.getMessage()); }; ListTaskPushNotificationConfigRequest listRequest @@ -1423,8 +1421,8 @@ public void testListPushNotificationConfigTaskNotFound() { public void testDeletePushNotificationConfig() { JSONRPCHandler handler = new JSONRPCHandler(CARD, requestHandler, internalExecutor); taskStore.save(MINIMAL_TASK, false); - agentExecutorExecute = (context, eventQueue) -> { - eventQueue.enqueueEvent(context.getTask() != null ? context.getTask() : context.getMessage()); + agentExecutorExecute = (context, agentEmitter) -> { + agentEmitter.emitEvent(context.getTask() != null ? context.getTask() : context.getMessage()); }; TaskPushNotificationConfig taskPushConfig @@ -1451,8 +1449,8 @@ public void testDeletePushNotificationConfigNotSupported() { AgentCard card = createAgentCard(true, false, true); JSONRPCHandler handler = new JSONRPCHandler(card, requestHandler, internalExecutor); taskStore.save(MINIMAL_TASK, false); - agentExecutorExecute = (context, eventQueue) -> { - eventQueue.enqueueEvent(context.getTask() != null ? context.getTask() : context.getMessage()); + agentExecutorExecute = (context, agentEmitter) -> { + agentEmitter.emitEvent(context.getTask() != null ? context.getTask() : context.getMessage()); }; TaskPushNotificationConfig taskPushConfig @@ -1480,8 +1478,8 @@ public void testDeletePushNotificationConfigNoPushConfigStore() { DefaultRequestHandler.create(executor, taskStore, queueManager, null, mainEventBusProcessor, internalExecutor, internalExecutor); JSONRPCHandler handler = new JSONRPCHandler(CARD, requestHandler, internalExecutor); taskStore.save(MINIMAL_TASK, false); - agentExecutorExecute = (context, eventQueue) -> { - eventQueue.enqueueEvent(context.getTask() != null ? context.getTask() : context.getMessage()); + agentExecutorExecute = (context, agentEmitter) -> { + agentEmitter.emitEvent(context.getTask() != null ? context.getTask() : context.getMessage()); }; TaskPushNotificationConfig taskPushConfig @@ -1523,14 +1521,14 @@ public void testStreamingDoesNotBlockMainThread() throws Exception { CountDownLatch streamStarted = new CountDownLatch(1); CountDownLatch eventProcessed = new CountDownLatch(1); - agentExecutorExecute = (context, eventQueue) -> { + agentExecutorExecute = (context, agentEmitter) -> { // Wait a bit to ensure the main thread continues try { Thread.sleep(100); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } - eventQueue.enqueueEvent(context.getMessage()); + agentEmitter.sendMessage(context.getMessage()); }; Message message = Message.builder(MESSAGE) @@ -1734,8 +1732,8 @@ public void testRequiredExtensionProvidedSuccess() { requestedExtensions ); - agentExecutorExecute = (context, eventQueue) -> { - eventQueue.enqueueEvent(context.getMessage()); + agentExecutorExecute = (context, agentEmitter) -> { + agentEmitter.sendMessage(context.getMessage()); }; Message message = Message.builder(MESSAGE) @@ -1892,8 +1890,8 @@ public void testCompatibleVersionSuccess() { "1.1" // Compatible version (same major version) ); - agentExecutorExecute = (context, eventQueue) -> { - eventQueue.enqueueEvent(context.getMessage()); + agentExecutorExecute = (context, agentEmitter) -> { + agentEmitter.sendMessage(context.getMessage()); }; Message message = Message.builder(MESSAGE) @@ -1928,8 +1926,8 @@ public void testNoVersionDefaultsToCurrentVersionSuccess() { JSONRPCHandler handler = new JSONRPCHandler(agentCard, requestHandler, internalExecutor); // Use default callContext (no version - should default to 1.0) - agentExecutorExecute = (context, eventQueue) -> { - eventQueue.enqueueEvent(context.getMessage()); + agentExecutorExecute = (context, agentEmitter) -> { + agentEmitter.sendMessage(context.getMessage()); }; Message message = Message.builder(MESSAGE) diff --git a/transport/rest/src/test/java/io/a2a/transport/rest/handler/RestHandlerTest.java b/transport/rest/src/test/java/io/a2a/transport/rest/handler/RestHandlerTest.java index db3ff97aa..80c7221e7 100644 --- a/transport/rest/src/test/java/io/a2a/transport/rest/handler/RestHandlerTest.java +++ b/transport/rest/src/test/java/io/a2a/transport/rest/handler/RestHandlerTest.java @@ -14,7 +14,7 @@ import io.a2a.server.ServerCallContext; import io.a2a.server.auth.UnauthenticatedUser; import io.a2a.server.requesthandlers.AbstractA2ARequestHandlerTest; -import io.a2a.server.tasks.TaskUpdater; +import io.a2a.server.tasks.AgentEmitter; import io.a2a.spec.AgentCapabilities; import io.a2a.spec.AgentCard; import io.a2a.spec.AgentExtension; @@ -84,8 +84,8 @@ public void testListTasksInvalidStatus() { @Test public void testSendMessage() throws InvalidProtocolBufferException { RestHandler handler = new RestHandler(CARD, requestHandler, internalExecutor); - agentExecutorExecute = (context, eventQueue) -> { - eventQueue.enqueueEvent(context.getMessage()); + agentExecutorExecute = (context, agentEmitter) -> { + agentEmitter.sendMessage(context.getMessage()); }; String requestBody = """ { @@ -164,13 +164,12 @@ public void testCancelTaskSuccess() { RestHandler handler = new RestHandler(CARD, requestHandler, internalExecutor); taskStore.save(MINIMAL_TASK, false); - agentExecutorCancel = (context, eventQueue) -> { + agentExecutorCancel = (context, agentEmitter) -> { // We need to cancel the task or the EventConsumer never finds a 'final' event. // Looking at the Python implementation, they typically use AgentExecutors that // don't support cancellation. So my theory is the Agent updates the task to the CANCEL status Task task = context.getTask(); - TaskUpdater taskUpdater = new TaskUpdater(context, eventQueue); - taskUpdater.cancel(); + agentEmitter.cancel(); }; RestHandler.HTTPRestResponse response = handler.cancelTask(MINIMAL_TASK.id(), "", callContext); @@ -194,8 +193,8 @@ public void testCancelTaskNotFound() { @Test public void testSendStreamingMessageSuccess() { RestHandler handler = new RestHandler(CARD, requestHandler, internalExecutor); - agentExecutorExecute = (context, eventQueue) -> { - eventQueue.enqueueEvent(context.getMessage()); + agentExecutorExecute = (context, agentEmitter) -> { + agentEmitter.sendMessage(context.getMessage()); }; String requestBody = """ { @@ -360,14 +359,14 @@ public void testStreamingDoesNotBlockMainThread() throws Exception { AtomicBoolean eventReceived = new AtomicBoolean(false); CountDownLatch streamStarted = new CountDownLatch(1); CountDownLatch eventProcessed = new CountDownLatch(1); - agentExecutorExecute = (context, eventQueue) -> { + agentExecutorExecute = (context, agentEmitter) -> { // Wait a bit to ensure the main thread continues try { Thread.sleep(100); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } - eventQueue.enqueueEvent(context.getMessage()); + agentEmitter.sendMessage(context.getMessage()); }; String requestBody = """ @@ -608,8 +607,8 @@ public void testRequiredExtensionProvidedSuccess() { requestedExtensions ); - agentExecutorExecute = (context, eventQueue) -> { - eventQueue.enqueueEvent(context.getMessage()); + agentExecutorExecute = (context, agentEmitter) -> { + agentEmitter.sendMessage(context.getMessage()); }; String requestBody = """ @@ -802,8 +801,8 @@ public void testCompatibleVersionSuccess() { "1.1" // Compatible version (same major version) ); - agentExecutorExecute = (context, eventQueue) -> { - eventQueue.enqueueEvent(context.getMessage()); + agentExecutorExecute = (context, agentEmitter) -> { + agentEmitter.sendMessage(context.getMessage()); }; String requestBody = """ @@ -850,8 +849,8 @@ public void testNoVersionDefaultsToCurrentVersionSuccess() { RestHandler handler = new RestHandler(agentCard, requestHandler, internalExecutor); // Use default callContext (no version - should default to 1.0) - agentExecutorExecute = (context, eventQueue) -> { - eventQueue.enqueueEvent(context.getMessage()); + agentExecutorExecute = (context, agentEmitter) -> { + agentEmitter.sendMessage(context.getMessage()); }; String requestBody = """