Skip to content

feat: Implement PushNotifications as per the 1.0 spec#622

Open
kabir wants to merge 1 commit intoa2aproject:mainfrom
kabir:event-queue-bus-improve-pnc
Open

feat: Implement PushNotifications as per the 1.0 spec#622
kabir wants to merge 1 commit intoa2aproject:mainfrom
kabir:event-queue-bus-improve-pnc

Conversation

@kabir
Copy link
Collaborator

@kabir kabir commented Feb 4, 2026

Fixes #490
Fixes: #594

Includes #611 (until that is merged)

@gemini-code-assist
Copy link
Contributor

Summary of Changes

Hello @kabir, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request significantly refactors the event processing and persistence architecture to align with the A2A 1.0 specification, particularly for streaming and push notifications. The introduction of a centralized event bus and processor ensures that all client-visible events and notifications reflect a durably stored state, enhancing consistency and reliability. Resource management for streaming connections has been improved with automatic client-side closing and server-side cancellation on disconnects. These changes, along with a more robust threading model, contribute to a more scalable and compliant system.

Highlights

  • Centralized Event Processing: Introduced a new architecture with a MainEventBus and MainEventBusProcessor to centralize event handling. This ensures all events are persisted to the TaskStore before being distributed to client-facing queues and before push notifications are sent, eliminating race conditions and ensuring data consistency.
  • Reliable Push Notifications: Push notifications are now sent asynchronously by the MainEventBusProcessor after successful persistence. They now support all StreamingEventKind types (Task, Message, TaskStatusUpdateEvent, TaskArtifactUpdateEvent) wrapped in the A2A 1.0 StreamResponse format.
  • Improved Streaming Event Handling: Client-side SSE event listeners (JSON-RPC and REST) now automatically close connections when a Task reaches a final state. Server-side, client disconnects (HTTP/SSE closeHandler, gRPC CancellationListener) are detected, triggering cancellation of the EventConsumer polling loop to free up resources.
  • Enhanced Queue Management: The EventQueue has been refactored into MainQueue (for central event management) and ChildQueue (for client-specific consumption). This new design ensures proper event distribution, graceful handling of client disconnects, and correct lifecycle management for both active and finalized tasks.
  • Optimized Threading Model: A dedicated EventConsumerExecutor (cached thread pool) is introduced for EventConsumer polling, separating I/O-bound polling from CPU-bound agent execution. The main AsyncExecutor now uses a bounded queue and allows core thread timeouts, preventing thread pool exhaustion and improving resource utilization.
  • A2A Spec 3.1.6 Compliance: Resubscription logic now explicitly sends the current Task state as the first event in the stream, aligning with the A2A Protocol Specification 3.1.6.
Changelog
  • client/transport/jsonrpc/src/main/java/io/a2a/client/transport/jsonrpc/sse/SSEEventListener.java
    • Added client-side auto-closing of SSE channels for Task objects in final states.
  • client/transport/rest/src/main/java/io/a2a/client/transport/rest/sse/RestSSEEventListener.java
    • Implemented client-side auto-closing of SSE channels for Task objects in final states.
  • examples/cloud-deployment/scripts/deploy.sh
    • Improved PostgreSQL deployment script to wait for pod creation before readiness.
  • extras/common/src/main/java/io/a2a/extras/common/events/TaskFinalizedEvent.java
    • Modified TaskFinalizedEvent to include the full Task object.
  • extras/push-notification-config-store-database-jpa/src/main/java/io/a2a/extras/pushnotificationconfigstore/database/jpa/JpaDatabasePushNotificationConfigStore.java
    • Minor formatting adjustment.
  • extras/push-notification-config-store-database-jpa/src/test/java/io/a2a/extras/pushnotificationconfigstore/database/jpa/JpaDatabasePushNotificationConfigStoreIntegrationTest.java
    • Updated tests to verify StreamingEventKind events for push notifications.
  • extras/push-notification-config-store-database-jpa/src/test/java/io/a2a/extras/pushnotificationconfigstore/database/jpa/MockPushNotificationSender.java
    • Adapted to handle StreamingEventKind events for push notifications.
  • extras/queue-manager-replicated/core/src/main/java/io/a2a/extras/queuemanager/replicated/core/ReplicatedEventQueueItem.java
    • Added isTaskEvent() method for specific event type checking.
  • extras/queue-manager-replicated/core/src/main/java/io/a2a/extras/queuemanager/replicated/core/ReplicatedQueueManager.java
    • Integrated MainEventBus for event submission.
    • Ensured TaskStatusUpdateEvent is sent before QueueClosedEvent for finalized tasks in replicated environments.
    • Improved handling of replicated events for inactive tasks.
  • extras/queue-manager-replicated/core/src/test/java/io/a2a/extras/queuemanager/replicated/core/ReplicatedQueueManagerTest.java
    • Updated tests to align with the new MainEventBusProcessor and asynchronous event handling.
  • extras/queue-manager-replicated/core/src/test/java/io/a2a/server/events/EventQueueUtil.java
    • Added new test utility class for EventQueue and MainEventBusProcessor.
  • extras/queue-manager-replicated/tests-multi-instance/quarkus-app-1/src/main/resources/application.properties
    • Added debug logging for task-related components.
  • extras/queue-manager-replicated/tests-multi-instance/quarkus-app-2/src/main/resources/application.properties
    • Added debug logging for task-related components.
  • extras/queue-manager-replicated/tests-multi-instance/tests/src/test/java/io/a2a/extras/queuemanager/replicated/tests/multiinstance/MultiInstanceReplicationTest.java
    • Updated multi-instance tests to filter initial TaskEvent on resubscribe and added log dumping on failure.
  • extras/queue-manager-replicated/tests-single-instance/src/test/java/io/a2a/extras/queuemanager/replicated/tests/KafkaReplicationIntegrationTest.java
    • Enforced A2A spec 3.1.6 for resubscribe events in tests.
  • extras/task-store-database-jpa/src/main/java/io/a2a/extras/taskstore/database/jpa/JpaDatabaseTaskStore.java
    • Modified save method to prevent firing TaskFinalizedEvent for replicated events.
  • extras/task-store-database-jpa/src/test/java/io/a2a/extras/taskstore/database/jpa/JpaDatabaseTaskStoreTest.java
    • Updated taskStore.save calls to include isReplicated parameter.
  • reference/jsonrpc/src/main/java/io/a2a/server/apps/quarkus/A2AServerRoutes.java
    • Refactored SSE handling to use SseFormatter and MultiSseSupport for client disconnect detection.
  • reference/jsonrpc/src/test/resources/application.properties
    • Added debug logging for event processing and request handling.
  • reference/rest/src/main/java/io/a2a/server/rest/quarkus/A2AServerRoutes.java
    • Refactored SSE handling to use SseFormatter and MultiSseSupport with specific headers for buffering control.
  • server-common/src/main/java/io/a2a/server/ServerCallContext.java
    • Added eventConsumerCancelCallback for transport-layer client disconnect handling.
  • server-common/src/main/java/io/a2a/server/events/EventConsumer.java
    • Implemented client disconnect cancellation and a grace period for replicated events.
  • server-common/src/main/java/io/a2a/server/events/EventQueue.java
    • Refactored into MainQueue and ChildQueue for hierarchical event management and asynchronous distribution.
  • server-common/src/main/java/io/a2a/server/events/InMemoryQueueManager.java
    • Updated constructors and builder methods to integrate MainEventBus.
  • server-common/src/main/java/io/a2a/server/events/MainEventBus.java
    • Introduced central event bus for asynchronous, ordered event processing.
  • server-common/src/main/java/io/a2a/server/events/MainEventBusContext.java
    • Added record for MainEventBus event context.
  • server-common/src/main/java/io/a2a/server/events/MainEventBusProcessor.java
    • Implemented background processor for MainEventBus to ensure "persist before distribute" logic.
  • server-common/src/main/java/io/a2a/server/events/MainEventBusProcessorCallback.java
    • Added callback interface for testing MainEventBusProcessor.
  • server-common/src/main/java/io/a2a/server/events/MainEventBusProcessorInitializer.java
    • Added CDI initializer for eager startup of MainEventBusProcessor.
  • server-common/src/main/java/io/a2a/server/events/QueueManager.java
    • Updated interface to enforce MainEventBus integration in builder methods.
  • server-common/src/main/java/io/a2a/server/requesthandlers/DefaultRequestHandler.java
    • Integrated MainEventBusProcessor and EventConsumerExecutor.
    • Refactored onMessageSend and onMessageSendStream for new event processing flow and client disconnect handling.
    • Removed background task tracking.
    • Ensured A2A spec 3.1.6 compliance for resubscribe.
  • server-common/src/main/java/io/a2a/server/tasks/BasePushNotificationSender.java
    • Updated to send StreamingEventKind events wrapped in StreamResponse format.
  • server-common/src/main/java/io/a2a/server/tasks/InMemoryTaskStore.java
    • Updated save method signature.
  • server-common/src/main/java/io/a2a/server/tasks/PushNotificationSender.java
    • Updated interface to accept StreamingEventKind for push notifications.
  • server-common/src/main/java/io/a2a/server/tasks/ResultAggregator.java
    • Updated to use eventConsumerExecutor and removed direct TaskManager.process() calls.
  • server-common/src/main/java/io/a2a/server/tasks/TaskManager.java
    • Modified process and saveTaskEvent to return isFinal status and accept isReplicated.
  • server-common/src/main/java/io/a2a/server/tasks/TaskStore.java
    • Updated save method signature.
  • server-common/src/main/java/io/a2a/server/util/async/AsyncExecutorProducer.java
    • Configured ThreadPoolExecutor with a bounded queue and core thread timeout for better resource management.
  • server-common/src/main/java/io/a2a/server/util/async/EventConsumerExecutorProducer.java
    • Added dedicated cached thread pool for EventConsumer polling.
  • server-common/src/main/java/io/a2a/server/util/sse/SseFormatter.java
    • Added utility for formatting SSE events.
  • server-common/src/main/java/io/a2a/server/util/sse/package-info.java
    • Added package-info for SSE utilities.
  • server-common/src/main/resources/META-INF/a2a-defaults.properties
    • Added a2a.executor.queue-capacity property.
  • server-common/src/test/java/io/a2a/server/events/EventConsumerTest.java
    • Updated tests for new asynchronous event processing.
  • server-common/src/test/java/io/a2a/server/events/EventQueueTest.java
    • Updated tests for new asynchronous event processing and MainQueue/ChildQueue structure.
  • server-common/src/test/java/io/a2a/server/events/EventQueueUtil.java
    • Updated test utility for EventQueue builder and MainEventBusProcessor.
  • server-common/src/test/java/io/a2a/server/events/InMemoryQueueManagerTest.java
    • Updated tests for new MainEventBus integration.
  • server-common/src/test/java/io/a2a/server/requesthandlers/AbstractA2ARequestHandlerTest.java
    • Updated test setup for MainEventBusProcessor and TestHttpClient to handle StreamingEventKind.
  • server-common/src/test/java/io/a2a/server/requesthandlers/DefaultRequestHandlerTest.java
    • Removed test file.
  • server-common/src/test/java/io/a2a/server/tasks/InMemoryTaskStoreTest.java
    • Removed test file.
  • server-common/src/test/java/io/a2a/server/tasks/PushNotificationSenderTest.java
    • Updated tests to verify StreamingEventKind push notifications.
  • server-common/src/test/java/io/a2a/server/tasks/ResultAggregatorTest.java
    • Updated tests for new asynchronous event processing and EventConsumerExecutor.
  • server-common/src/test/java/io/a2a/server/tasks/TaskManagerTest.java
    • Updated tests for isReplicated parameter and isFinal return.
  • server-common/src/test/java/io/a2a/server/tasks/TaskUpdaterTest.java
    • Updated tests for new asynchronous event processing.
  • tck/src/main/resources/application.properties
    • Added debug logging for thread statistics.
  • tests/server-common/src/test/java/io/a2a/server/apps/common/AbstractA2AServerTest.java
    • Updated tests to enforce A2A spec 3.1.6 for resubscribe events.
  • tests/server-common/src/test/java/io/a2a/server/apps/common/TestUtilsBean.java
    • Updated saveTask to include isReplicated parameter.
  • transport/grpc/src/main/java/io/a2a/transport/grpc/handler/GrpcHandler.java
    • Implemented client disconnect detection for gRPC streams and EventConsumer cancellation.
  • transport/grpc/src/test/java/io/a2a/transport/grpc/handler/GrpcHandlerTest.java
    • Updated tests for isReplicated parameter and A2A spec 3.1.6 compliance.
  • transport/jsonrpc/src/test/java/io/a2a/transport/jsonrpc/handler/JSONRPCHandlerTest.java
    • Updated tests for isReplicated parameter, asynchronous event processing, and A2A spec 3.1.6 compliance.
  • transport/rest/src/main/java/io/a2a/transport/rest/handler/RestHandler.java
    • Added fine-grained logging for streaming event processing.
  • transport/rest/src/test/java/io/a2a/transport/rest/handler/RestHandlerTest.java
    • Updated tests for isReplicated parameter.
Activity
Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here.

You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a significant and well-executed architectural refactoring of the server's event processing and queueing system. The introduction of a centralized MainEventBus and a dedicated MainEventBusProcessor is a major improvement, enforcing a "persist-before-visibility" pattern that enhances robustness and eliminates potential race conditions. The changes also address critical issues like resource leaks on client disconnects and thread pool exhaustion under load, and improve compliance with the A2A specification for push notifications and event streaming. The code quality is high, with clear documentation for the new architecture. I have a couple of minor suggestions regarding code duplication in the client and a potential inconsistency in the replicated event handling logic, but overall this is an excellent set of changes.

Comment on lines 72 to 80
boolean shouldClose = false;
if (event instanceof TaskStatusUpdateEvent tue && tue.isFinal()) {
shouldClose = true;
} else if (event instanceof Task task) {
TaskState state = task.status().state();
if (state.isFinal()) {
shouldClose = true;
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

This logic for determining if the connection should be closed based on a final event state is duplicated in client/transport/jsonrpc/src/main/java/io/a2a/client/transport/jsonrpc/sse/SSEEventListener.java. To improve maintainability and avoid future inconsistencies, consider extracting this logic into a shared static utility method. For example, a method StreamingEventUtils.isFinal(StreamingEventKind event) could be placed in a common client module.

Comment on lines 147 to 157
* IMPORTANT: We send TaskStatusUpdateEvent instead of full Task to maintain consistency
* with local event distribution. Clients expect TaskStatusUpdateEvent for status changes,
* and sending the full Task causes issues in remote instances where clients don't handle
* bare Task objects the same way they handle TaskStatusUpdateEvent.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The comment here states that sending a full Task object can cause issues for clients. However, other changes in this PR, specifically in SSEEventListener and RestSSEEventListener, add logic to handle receiving a Task with a final state to close the connection. This suggests clients are now expected to handle Task objects in the stream. If clients can indeed handle final Task objects, it might be simpler and more consistent to replicate the final Task directly from the TaskFinalizedEvent, rather than converting it to a TaskStatusUpdateEvent. This would also make the event flow more uniform. If the conversion is still necessary for other reasons, could the comment be updated to clarify the specific issues that remain?

@kabir kabir force-pushed the event-queue-bus-improve-pnc branch from 62e99c5 to 9414bfb Compare February 4, 2026 17:43
@kabir
Copy link
Collaborator Author

kabir commented Feb 4, 2026

/gemini review

Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request refactors the push notification mechanism to align with the 1.0 spec, allowing any StreamingEventKind (like Task, Message, TaskStatusUpdateEvent, and TaskArtifactUpdateEvent) to be sent as a notification, instead of just Task snapshots. The changes are well-implemented across the PushNotificationSender interface and its base implementation, ensuring events are serialized correctly into the StreamResponse format. The tests have been comprehensively updated to validate the new behavior for all event types, and documentation has been improved to reflect these changes. The overall implementation is solid and enhances the push notification feature significantly. I have one minor suggestion to improve the extensibility of the BasePushNotificationSender.

* @param event the streaming event
* @return the task ID, or null if not available
*/
private @Nullable String extractTaskId(StreamingEventKind event) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

To improve extensibility, consider changing the visibility of extractTaskId from private to protected. This would allow custom PushNotificationSender implementations that extend BasePushNotificationSender to reuse this helpful utility method. The Javadoc example in the PushNotificationSender interface already implies the existence of such a reusable helper.

Suggested change
private @Nullable String extractTaskId(StreamingEventKind event) {
protected @Nullable String extractTaskId(StreamingEventKind event) {

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

1 participant