Skip to content

feat!: Use AgentEmitter instead of EventQueue in AgentExecutor methods#621

Open
kabir wants to merge 1 commit intoa2aproject:mainfrom
kabir:event-queue-bus-round2
Open

feat!: Use AgentEmitter instead of EventQueue in AgentExecutor methods#621
kabir wants to merge 1 commit intoa2aproject:mainfrom
kabir:event-queue-bus-round2

Conversation

@kabir
Copy link
Collaborator

@kabir kabir commented Feb 4, 2026

BREAKING CHANGE: AgentEmitter contains the methods from the old TaskUpdater
and is now the only way agents send results back to the caller. This hides
the EventQueue mechanism from users.

Also introduced a check that when placing a full Task object on the queue,
which should only be done for calls with no existing Task, that the
Task's ID is the one expected for the queue, as calculated by the
RequestContext.

Added Message and Task builders to AgentEmitter to help with using the
proper taskID and contextId.

Fixes #604 🦕

@gemini-code-assist
Copy link
Contributor

Summary of Changes

Hello @kabir, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request significantly refactors the internal event processing and agent communication mechanisms to improve robustness, consistency, and developer experience. By introducing a dedicated AgentEmitter and a centralized MainEventBusProcessor, the system ensures that all agent-generated events are reliably persisted before being delivered to clients. This architectural shift addresses potential race conditions, streamlines event flow, and provides a more intuitive API for agent developers, albeit with a breaking change to AgentExecutor method signatures.

Highlights

  • Agent Communication Refactor: The TaskUpdater class has been removed and its functionality, along with direct EventQueue usage, has been consolidated into a new AgentEmitter class. This provides a cleaner, more centralized API for agents to send messages, update task status, and stream artifacts.
  • Breaking Change: This pull request introduces a breaking change as AgentExecutor methods now accept AgentEmitter instead of EventQueue, requiring updates to existing agent implementations.
  • Enhanced Event Processing Pipeline: A new MainEventBus and MainEventBusProcessor have been introduced to centralize event persistence and distribution. Events are now persisted to the TaskStore before being distributed to client queues, ensuring data consistency and reliability, especially in replicated environments.
  • Improved Task ID Validation: New checks ensure that when a full Task object is placed on the queue, its ID matches the expected ID for the RequestContext, preventing inconsistencies.
  • Client Disconnect Handling: Transport layers (gRPC, JSON-RPC/REST over SSE) now explicitly detect client disconnects and invoke a cancellation callback on the EventConsumer, allowing for graceful shutdown of polling loops and resource cleanup.
  • Optimized Thread Pool Management: The asynchronous executor for agent tasks now uses a bounded ArrayBlockingQueue and allows core threads to time out, preventing thread pool exhaustion and improving resource utilization, particularly in high-concurrency streaming scenarios.
  • Resubscription Protocol Adherence: The onResubscribeToTask method now strictly adheres to A2A Protocol Spec 3.1.6 by ensuring the current Task object is always the first event sent to a resubscribing client.
Changelog
  • README.md
    • Updated example code to reflect the new AgentEmitter usage.
  • WeatherAgentExecutorProducer.java
    • Replaced direct EventQueue and TaskUpdater usage with the new AgentEmitter.
  • client/transport/jsonrpc/src/main/java/io/a2a/client/transport/jsonrpc/sse/SSEEventListener.java
    • Enhanced logic to close SSE channels when a Task object with a final state is received, in addition to TaskStatusUpdateEvent.
  • client/transport/rest/src/main/java/io/a2a/client/transport/rest/sse/RestSSEEventListener.java
    • Extended SSE channel closing logic to handle Task objects in final states, not just TaskStatusUpdateEvent.
  • examples/cloud-deployment/README.md
    • Updated example code snippets to use AgentEmitter for task updates.
  • examples/cloud-deployment/scripts/deploy.sh
    • Added a kubectl wait loop to ensure PostgreSQL pod creation before proceeding with deployment, improving script reliability.
  • examples/cloud-deployment/server/src/main/java/io/a2a/examples/cloud/CloudAgentExecutorProducer.java
    • Migrated agent execution logic from EventQueue and TaskUpdater to AgentEmitter.
  • examples/helloworld/server/src/main/java/io/a2a/examples/helloworld/AgentExecutorProducer.java
    • Updated AgentExecutor implementation to use AgentEmitter.sendMessage instead of direct EventQueue.enqueueEvent.
  • extras/common/src/main/java/io/a2a/extras/common/events/TaskFinalizedEvent.java
    • Modified TaskFinalizedEvent to include the full Task object, ensuring complete state is available for replication.
  • extras/push-notification-config-store-database-jpa/src/main/java/io/a2a/extras/pushnotificationconfigstore/database/jpa/JpaDatabasePushNotificationConfigStore.java
    • Added a new line at the end of the file for formatting consistency.
  • extras/push-notification-config-store-database-jpa/src/test/java/io/a2a/extras/pushnotificationconfigstore/database/jpa/JpaDatabasePushNotificationConfigStoreTestAgentExecutor.java
    • Refactored test agent executor to utilize AgentEmitter for task state updates and artifact additions.
  • extras/queue-manager-replicated/core/src/main/java/io/a2a/extras/queuemanager/replicated/core/ReplicatedEventQueueItem.java
    • Introduced isTaskEvent() method to identify Task events for special handling in replication.
  • extras/queue-manager-replicated/core/src/main/java/io/a2a/extras/queuemanager/replicated/core/ReplicatedQueueManager.java
    • Updated constructor to inject MainEventBus for centralized event processing.
    • Modified onReplicatedEvent to correctly handle Task events and ensure MainQueue creation for late-arriving events.
    • Adjusted onTaskFinalized to send a TaskStatusUpdateEvent (derived from the final Task) before the QueueClosedEvent to maintain event ordering.
    • Updated ReplicatingEventQueueFactory to use createBaseEventQueueBuilder for consistent queue setup.
  • extras/queue-manager-replicated/core/src/test/java/io/a2a/extras/queuemanager/replicated/core/ReplicatedQueueManagerTest.java
    • Extensively updated test setup and teardown to manage MainEventBus and MainEventBusProcessor lifecycle.
    • Introduced waitForEventProcessing and dequeueEventWithRetry helpers for deterministic testing of asynchronous event flows.
    • Modified test cases to align with the new event processing and replication logic, including updated assertions for event counts.
  • extras/queue-manager-replicated/core/src/test/java/io/a2a/server/events/EventQueueUtil.java
    • Added a new utility class to provide static helpers for MainEventBusProcessor lifecycle management in tests.
  • extras/queue-manager-replicated/tests-multi-instance/quarkus-app-1/src/main/resources/application.properties
    • Added debug logging categories for io.a2a.server.tasks and io.a2a.extras.taskstore.database.jpa.
  • extras/queue-manager-replicated/tests-multi-instance/quarkus-app-2/src/main/resources/application.properties
    • Added debug logging categories for io.a2a.server.tasks and io.a2a.extras.taskstore.database.jpa.
  • extras/queue-manager-replicated/tests-multi-instance/quarkus-common/src/main/java/io/a2a/extras/queuemanager/replicated/tests/multiinstance/MultiInstanceReplicationAgentExecutor.java
    • Updated agent executor to use AgentEmitter for all task-related operations.
  • extras/queue-manager-replicated/tests-multi-instance/tests/src/test/java/io/a2a/extras/queuemanager/replicated/tests/multiinstance/MultiInstanceReplicationTest.java
    • Added testFailure and container log dumping for improved debugging of multi-instance tests.
    • Modified event consumers to filter initial TaskEvent on resubscribe, aligning with A2A spec 3.1.6.
  • extras/queue-manager-replicated/tests-single-instance/src/test/java/io/a2a/extras/queuemanager/replicated/tests/KafkaReplicationIntegrationTest.java
    • Updated event consumer logic to enforce that the first event on resubscribe is a TaskEvent, as per A2A spec 3.1.6.
  • extras/queue-manager-replicated/tests-single-instance/src/test/java/io/a2a/extras/queuemanager/replicated/tests/ReplicationTestAgentExecutor.java
    • Migrated agent executor to use AgentEmitter for all task state changes.
  • extras/task-store-database-jpa/src/main/java/io/a2a/extras/taskstore/database/jpa/JpaDatabaseTaskStore.java
    • Modified the save method to accept an isReplicated flag, preventing feedback loops in replicated scenarios.
    • Implemented logic to fire TaskFinalizedEvent only for locally-generated final states, passing the full Task object.
  • extras/task-store-database-jpa/src/test/java/io/a2a/extras/taskstore/database/jpa/JpaDatabaseTaskStoreTest.java
    • Updated all taskStore.save calls to include the isReplicated=false flag for local operations.
  • extras/task-store-database-jpa/src/test/java/io/a2a/extras/taskstore/database/jpa/JpaDatabaseTaskStoreTestAgentExecutor.java
    • Converted agent executor to use AgentEmitter for task updates.
  • reference/jsonrpc/src/main/java/io/a2a/server/apps/quarkus/A2AServerRoutes.java
    • Removed ReactiveRoutes import.
    • Integrated SseFormatter for standardized Server-Sent Events (SSE) formatting.
    • Implemented ServerCallContext.setEventConsumerCancelCallback to handle client disconnects and stop EventConsumer polling loops.
  • reference/jsonrpc/src/test/resources/application.properties
    • Added debug logging categories for io.a2a.server.events, io.a2a.server.requesthandlers, and io.a2a.server.tasks.
  • reference/rest/src/main/java/io/a2a/server/rest/quarkus/A2AServerRoutes.java
    • Removed ReactiveRoutes import.
    • Integrated SseFormatter for standardized Server-Sent Events (SSE) formatting.
    • Added ServerCallContext.setEventConsumerCancelCallback to handle client disconnects and stop EventConsumer polling loops.
    • Configured SSE headers (Cache-Control, X-Accel-Buffering) and Vert.x write queue size for immediate flushing.
  • server-common/src/main/java/io/a2a/server/ServerCallContext.java
    • Added eventConsumerCancelCallback field and associated set and invoke methods for client disconnect handling.
  • server-common/src/main/java/io/a2a/server/agentexecution/AgentExecutor.java
    • Updated execute and cancel methods to use AgentEmitter instead of EventQueue, along with corresponding Javadoc updates.
  • server-common/src/main/java/io/a2a/server/agentexecution/RequestContext.java
    • Updated Javadoc examples to reflect the new AgentEmitter usage.
  • server-common/src/main/java/io/a2a/server/events/EventConsumer.java
    • Introduced cancelled flag and cancel() method to allow external cancellation of the polling loop.
    • Added agentCompleted flag and a grace period for polling after agent completion to accommodate replicated events.
    • Implemented a small Thread.sleep(50) after sending final events to allow SSE buffers to flush before stream completion.
  • server-common/src/main/java/io/a2a/server/events/EventQueue.java
    • Refactored MainQueue to submit events to a new MainEventBus instead of directly managing a BlockingQueue and Semaphore.
    • Introduced ChildQueue with its own BlockingQueue for local consumption.
    • Added validateEventIds to MainQueue to ensure taskId consistency for events.
    • Modified EventQueue.builder() to require MainEventBus for proper initialization.
    • Added enqueueLocalOnly methods for ChildQueue to handle already-persisted events.
    • Removed direct dequeue operations from MainQueue, enforcing consumption only through ChildQueue instances.
    • Added abstract size() method for queue size retrieval.
  • server-common/src/main/java/io/a2a/server/events/InMemoryQueueManager.java
    • Updated constructor to inject MainEventBus.
    • Modified getEventQueueBuilder and createBaseEventQueueBuilder to correctly configure MainEventBus.
  • server-common/src/main/java/io/a2a/server/events/MainEventBus.java
    • New file: Implements a central blocking queue for all events, acting as the single entry point for event processing.
  • server-common/src/main/java/io/a2a/server/events/MainEventBusContext.java
    • New file: A record class to encapsulate event context for the MainEventBus.
  • server-common/src/main/java/io/a2a/server/events/MainEventBusProcessor.java
    • New file: A background processor that consumes events from MainEventBus, persists them to TaskStore, distributes them to ChildQueues, and triggers replication/push notifications.
  • server-common/src/main/java/io/a2a/server/events/MainEventBusProcessorCallback.java
    • New file: Defines a callback interface for MainEventBusProcessor events, primarily for testing synchronization.
  • server-common/src/main/java/io/a2a/server/events/MainEventBusProcessorInitializer.java
    • New file: A CDI initializer to ensure eager startup of MainEventBusProcessor.
  • server-common/src/main/java/io/a2a/server/events/QueueManager.java
    • Updated default getEventQueueBuilder and added createBaseEventQueueBuilder methods to ensure MainEventBus is provided.
  • server-common/src/main/java/io/a2a/server/requesthandlers/DefaultRequestHandler.java
    • Injected MainEventBusProcessor and a dedicated EventConsumerExecutor.
    • Removed backgroundTasks tracking, as cleanup is now integrated with the new event pipeline.
    • Refactored onCancelTask, onMessageSend, onMessageSendStream, and onResubscribeToTask to use AgentEmitter and the new event processing flow.
    • Removed direct sendPushNotification calls, as push notifications are now handled by MainEventBusProcessor.
    • Updated blocking/non-blocking logic to leverage MainEventBusProcessor for persistence guarantees.
    • Introduced THREAD_STATS_LOGGER for detailed diagnostic logging of thread pool activity.
    • Refined initMessageSend to ensure consistent taskId generation and handling.
  • server-common/src/main/java/io/a2a/server/tasks/AgentEmitter.java
    • New file: Replaces TaskUpdater, providing a simplified and consistent API for agents to emit events, manage task lifecycle, and send messages.
  • server-common/src/main/java/io/a2a/server/tasks/InMemoryTaskStore.java
    • Modified the save method to accept an isReplicated flag, aligning with JpaDatabaseTaskStore.
  • server-common/src/main/java/io/a2a/server/tasks/ResultAggregator.java
    • Injected EventConsumerExecutor to isolate event consumption to a dedicated thread pool.
    • Removed direct taskManager.process calls, as persistence is now handled by MainEventBusProcessor.
    • Modified consumeAndEmit to ensure subscription happens on the eventConsumerExecutor.
    • Updated consumeAndBreakOnInterrupt to use EventConsumerExecutor and capture Task events directly from the queue.
  • server-common/src/main/java/io/a2a/server/tasks/TaskManager.java
    • Modified process and saveTaskEvent methods to accept an isReplicated flag and return a boolean indicating if the task reached a final state.
    • Changed InvalidParamsError to InternalError for taskId mismatch scenarios.
  • server-common/src/main/java/io/a2a/server/tasks/TaskStore.java
    • Modified the save method signature to include an isReplicated flag.
  • server-common/src/main/java/io/a2a/server/tasks/TaskUpdater.java
    • Removed: Functionality replaced by AgentEmitter.
  • server-common/src/main/java/io/a2a/server/util/async/AsyncExecutorProducer.java
    • Switched from LinkedBlockingQueue to ArrayBlockingQueue for the executor's work queue, enabling proper thread pool growth.
    • Added a2a.executor.queue-capacity configuration property.
    • Enabled allowCoreThreadTimeOut to reclaim idle core threads, improving resource management.
    • Added logPoolStats for diagnostic monitoring of the executor pool.
  • server-common/src/main/java/io/a2a/server/util/async/EventConsumerExecutorProducer.java
    • New file: Provides a dedicated cached thread pool for EventConsumer polling, preventing deadlocks and improving concurrency.
  • server-common/src/main/java/io/a2a/server/util/sse/SseFormatter.java
    • New file: A framework-agnostic utility for formatting A2A responses as Server-Sent Events (SSE).
  • server-common/src/main/java/io/a2a/server/util/sse/package-info.java
    • New file: Package information for SSE formatting utilities.
  • server-common/src/main/resources/META-INF/a2a-defaults.properties
    • Added a2a.executor.queue-capacity property with a default value of 100.
  • server-common/src/test/java/io/a2a/server/events/EventConsumerTest.java
    • Updated tests to integrate MainEventBusProcessor for event processing and waitForEventProcessing for synchronization.
  • server-common/src/test/java/io/a2a/server/events/EventQueueTest.java
    • Updated tests to use MainEventBusProcessor and waitForEventProcessing for asynchronous event handling.
  • server-common/src/test/java/io/a2a/server/events/EventQueueUtil.java
    • Updated to provide MainEventBus and MainEventBusProcessor lifecycle management for tests.
  • server-common/src/test/java/io/a2a/server/events/InMemoryQueueManagerTest.java
    • Updated tests to integrate MainEventBusProcessor for event processing.
  • server-common/src/test/java/io/a2a/server/requesthandlers/AbstractA2ARequestHandlerTest.java
    • Updated AgentExecutorMethod to use AgentEmitter.
    • Modified init and cleanup methods to manage the lifecycle of MainEventBusProcessor.
  • server-common/src/test/java/io/a2a/server/requesthandlers/DefaultRequestHandlerTest.java
    • Removed: Tests have been refactored and integrated into AbstractA2ARequestHandlerTest or other specific test classes.
  • server-common/src/test/java/io/a2a/server/tasks/AgentEmitterTest.java
    • Renamed from TaskUpdaterTest.java.
    • Updated tests to use the new AgentEmitter class and integrate MainEventBusProcessor for event processing.
  • server-common/src/test/java/io/a2a/server/tasks/InMemoryTaskStoreTest.java
    • Removed: Tests have been integrated into JpaDatabaseTaskStoreTest or are no longer necessary due to architectural changes.
  • server-common/src/test/java/io/a2a/server/tasks/ResultAggregatorTest.java
    • Updated constructor to accept EventConsumerExecutor.
    • Removed direct calls to taskManager.process, as persistence is now handled by MainEventBusProcessor.
    • Modified consumeAndBreakNonBlocking to use EventConsumerExecutor for subscription.
  • server-common/src/test/java/io/a2a/server/tasks/TaskManagerTest.java
    • Updated saveTask calls to include the isReplicated=false flag.
    • Changed expected error type from InvalidParamsError to InternalError for taskId mismatch.
  • tck/src/main/java/io/a2a/tck/server/AgentExecutorProducer.java
    • Updated AgentExecutor implementation to use AgentEmitter for all task-related operations.
  • tck/src/main/resources/application.properties
    • Added io.a2a.server.diagnostics.ThreadStats.level=DEBUG for detailed thread pool logging.
  • tests/server-common/src/test/java/io/a2a/server/apps/common/AbstractA2AServerTest.java
    • Modified resubscribe consumers to enforce that the first event received is a TaskEvent, as per A2A spec 3.1.6.
  • tests/server-common/src/test/java/io/a2a/server/apps/common/AgentExecutorProducer.java
    • Updated AgentExecutor implementation to use AgentEmitter for task updates and message sending.
  • tests/server-common/src/test/java/io/a2a/server/apps/common/TestUtilsBean.java
    • Updated saveTask method to include the isReplicated=false flag.
  • transport/grpc/src/main/java/io/a2a/transport/grpc/handler/GrpcHandler.java
    • Modified convertToStreamResponse to accept ServerCallContext.
    • Implemented Context.CancellationListener to detect gRPC client disconnects and invoke EventConsumer.cancel().
    • Added error handling for ExtendedAgentCardNotConfiguredError to prevent hanging when extended agent card is not configured.
  • transport/grpc/src/test/java/io/a2a/transport/grpc/handler/GrpcHandlerTest.java
    • Updated taskStore.save calls to include isReplicated=false.
    • Modified agentExecutorCancel and agentExecutorExecute to use AgentEmitter.
    • Adjusted resubscribe test to expect two events (initial Task + agent message) per A2A spec.
  • transport/jsonrpc/src/test/java/io/a2a/transport/jsonrpc/handler/JSONRPCHandlerTest.java
    • Updated taskStore.save calls to include isReplicated=false.
    • Modified agentExecutorExecute and agentExecutorCancel to use AgentEmitter.
    • Adjusted resubscribe test to expect two events (initial Task + agent message) and enforce initial Task.
    • Added polling logic to verify TaskStore finalization in multi-event streaming tests.
  • transport/rest/src/main/java/io/a2a/transport/rest/handler/RestHandler.java
    • Added fine-grained logging to convertToSendStreamingMessageResponse for better debugging of streaming flows.
  • transport/rest/src/test/java/io/a2a/transport/rest/handler/RestHandlerTest.java
    • Updated taskStore.save calls to include isReplicated=false.
    • Modified agentExecutorExecute and agentExecutorCancel to use AgentEmitter.
Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here.

You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This is an excellent and significant refactoring that greatly improves the architecture and robustness of the A2A Java SDK. The introduction of the AgentEmitter provides a much cleaner and simpler API for agent developers, abstracting away the complexities of the EventQueue. The new MainEventBus architecture with its "persistence first" approach is a major step forward for data consistency and reliability, especially in replicated environments. The improvements to thread pool management, client disconnect handling, and replication logic are all critical for stability and performance. The test suite has also been made more robust and reliable. The minor suggestion regarding a comment in a test file has been retained as it does not contradict any established rules.

Comment on lines 527 to 552
// strategy.getCallCount() should be 3: testEvent, final Task, then QueueClosedEvent (poison pill)
assertEquals(3, strategy.getCallCount(), "Should have replicated testEvent, final Task, and QueueClosedEvent");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The comment on line 527 is slightly misleading. It states that a 'final Task' is replicated, but the implementation in ReplicatedQueueManager.onTaskFinalized actually sends a TaskStatusUpdateEvent to maintain consistency with local event distribution. The assertion on line 528 is correct (it expects 3 events), but updating the comment would improve clarity for future maintainers.

Suggested change
// strategy.getCallCount() should be 3: testEvent, final Task, then QueueClosedEvent (poison pill)
assertEquals(3, strategy.getCallCount(), "Should have replicated testEvent, final Task, and QueueClosedEvent");
// strategy.getCallCount() should be 3: testEvent, final TaskStatusUpdateEvent, then QueueClosedEvent (poison pill)
assertEquals(3, strategy.getCallCount(), "Should have replicated testEvent, final TaskStatusUpdateEvent, and QueueClosedEvent");

@kabir kabir force-pushed the event-queue-bus-round2 branch 2 times, most recently from c35e5e3 to 065787e Compare February 4, 2026 14:51
BREAKING CHANGE: AgentEmitter contains the methods from the old TaskUpdater
and is now the only way agents send results back to the caller. This hides
the EventQueue mechanism from users.

Also introduced a check that when placing a full Task object on the queue,
which should only be done for calls with no existing Task, that the
Task's ID is the one expected for the queue, as calculated by the
RequestContext.

Added Message and Task builders to AgentEmitter to help with using the
proper taskID and contextId.
@kabir kabir force-pushed the event-queue-bus-round2 branch from 065787e to 8bb5d93 Compare February 4, 2026 17:42
@kabir
Copy link
Collaborator Author

kabir commented Feb 4, 2026

/gemini review

Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a significant and beneficial refactoring by replacing the TaskUpdater with a new AgentEmitter class. This change provides a cleaner and more powerful API for agent developers, encapsulating task lifecycle management and message sending in a single place. The new AgentEmitter is well-documented and includes helpful builders for creating tasks and messages.

Key improvements include:

  • A unified AgentEmitter API for all agent-to-client communication.
  • Simplification of agent implementations, as seen in the updated examples and tests.
  • Introduction of taskId validation in the EventQueue for improved robustness.

The changes have been applied consistently throughout the codebase. I have one minor suggestion in a test file to improve the logic after a task is failed. Overall, this is an excellent pull request that improves the SDK's design and usability.

Comment on lines +70 to +71
agentEmitter.fail(new UnsupportedOperationError());
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

After failing the task with agentEmitter.fail(), the execution continues and attempts to send another message or task event. This is likely not the intended behavior. You should probably add a return; statement after agentEmitter.fail() to terminate the execution for this case.

                    agentEmitter.fail(new UnsupportedOperationError());
                    return;
                }

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[Feat]: Don't expose EventQueue in AgentExecutor

1 participant