Skip to content

Change IEventReader to return IAsyncEnumerable#522

Merged
alexeyzimarev merged 3 commits intodevfrom
feature/ieventreader-async-enumerable
Mar 11, 2026
Merged

Change IEventReader to return IAsyncEnumerable#522
alexeyzimarev merged 3 commits intodevfrom
feature/ieventreader-async-enumerable

Conversation

@alexeyzimarev
Copy link
Contributor

Summary

  • IEventReader.ReadEvents and ReadEventsBackwards now return IAsyncEnumerable<StreamEvent> instead of Task<StreamEvent[]>, with the failIfNotFound parameter removed from the interface
  • The previous signatures (with failIfNotFound, returning Task<StreamEvent[]>) are preserved as extension methods in StoreFunctions, so all downstream callers compile and work unchanged
  • All implementations updated: KurrentDB, SQL (Postgres/SqlServer/Sqlite), Redis, Elastic, InMemory, Tiered, and Traced decorators

Test plan

  • Full solution builds with 0 errors
  • Core tests pass (26/26)
  • Application tests pass (21/21)
  • Subscription tests pass (30/30)
  • DI tests pass (3/3)
  • ASP.NET Core extension tests pass (9/9)
  • Spyglass tests pass (5/5)
  • Integration tests with infrastructure (EventStoreDB, Postgres, SQL Server, Redis) should be validated in CI

🤖 Generated with Claude Code

…d arrays

The interface methods ReadEvents and ReadEventsBackwards now return
IAsyncEnumerable<StreamEvent> without the failIfNotFound parameter.
The previous signatures are preserved as extension methods on IEventReader
so all downstream callers continue to work unchanged.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
@qodo-free-for-open-source-projects
Copy link
Contributor

Review Summary by Qodo

Change IEventReader to return IAsyncEnumerable instead of materialized arrays

✨ Enhancement

Grey Divider

Walkthroughs

Description
• Changed IEventReader interface to return IAsyncEnumerable<StreamEvent> instead of
  Task<StreamEvent[]>
• Removed failIfNotFound parameter from interface methods; now always throws StreamNotFound if
  stream missing
• Added extension methods in StoreFunctions preserving old signatures for backward compatibility
• Updated all implementations: KurrentDB, SQL, Redis, Elastic, InMemory, Tiered, and Traced
  decorators
Diagram
flowchart LR
  A["IEventReader Interface"] -->|"Return type changed"| B["IAsyncEnumerable&lt;StreamEvent&gt;"]
  A -->|"Parameter removed"| C["failIfNotFound removed"]
  D["StoreFunctions Extensions"] -->|"Preserve old signatures"| E["Task&lt;StreamEvent[]&gt; with failIfNotFound"]
  F["All Implementations"] -->|"Updated to new interface"| G["KurrentDB, SQL, Redis, Elastic, InMemory, Tiered, Traced"]
Loading

Grey Divider

File Changes

1. src/Core/src/Eventuous.Persistence/EventStore/IEventReader.cs ✨ Enhancement +8/-8

Update interface signatures to return async enumerable

src/Core/src/Eventuous.Persistence/EventStore/IEventReader.cs


2. src/Core/src/Eventuous.Persistence/EventStore/StoreFunctions.cs ✨ Enhancement +68/-0

Add extension methods for backward compatibility

src/Core/src/Eventuous.Persistence/EventStore/StoreFunctions.cs


3. src/Core/src/Eventuous.Persistence/Diagnostics/Tracing/BaseTracer.cs ✨ Enhancement +17/-0

Add TraceEnumerable method for async enumerable tracing

src/Core/src/Eventuous.Persistence/Diagnostics/Tracing/BaseTracer.cs


View more (11)
4. src/Core/src/Eventuous.Persistence/Diagnostics/Tracing/TracedEventReader.cs ✨ Enhancement +4/-4

Update traced reader to use new interface signatures

src/Core/src/Eventuous.Persistence/Diagnostics/Tracing/TracedEventReader.cs


5. src/Core/src/Eventuous.Persistence/Diagnostics/Tracing/TracedEventStore.cs ✨ Enhancement +4/-4

Update traced store to use new interface signatures

src/Core/src/Eventuous.Persistence/Diagnostics/Tracing/TracedEventStore.cs


6. src/Core/src/Eventuous.Persistence/EventStore/TieredEventReader.cs ✨ Enhancement +20/-12

Convert to async enumerable with yield return

src/Core/src/Eventuous.Persistence/EventStore/TieredEventReader.cs


7. src/Core/src/Eventuous.Persistence/EventStore/TieredEventStore.cs ✨ Enhancement +4/-4

Update tiered store to use new interface signatures

src/Core/src/Eventuous.Persistence/EventStore/TieredEventStore.cs


8. src/Core/test/Eventuous.Tests.Persistence.Base/Store/TieredStoreTests.cs 🧪 Tests +4/-4

Update test archive store implementation

src/Core/test/Eventuous.Tests.Persistence.Base/Store/TieredStoreTests.cs


9. src/KurrentDB/src/Eventuous.KurrentDB/KurrentDBEventStore.cs ✨ Enhancement +26/-38

Convert KurrentDB implementation to async enumerable

src/KurrentDB/src/Eventuous.KurrentDB/KurrentDBEventStore.cs


10. src/Relational/src/Eventuous.Sql.Base/SqlEventStoreBase.cs ✨ Enhancement +27/-12

Convert SQL implementations to async enumerable

src/Relational/src/Eventuous.Sql.Base/SqlEventStoreBase.cs


11. src/Redis/src/Eventuous.Redis/RedisStore.cs ✨ Enhancement +9/-4

Convert Redis implementation to async enumerable

src/Redis/src/Eventuous.Redis/RedisStore.cs


12. src/Experimental/src/Eventuous.ElasticSearch/Store/ElasticEventStore.cs ✨ Enhancement +14/-8

Convert Elastic implementation to async enumerable

src/Experimental/src/Eventuous.ElasticSearch/Store/ElasticEventStore.cs


13. src/Testing/src/Eventuous.Testing/InMemoryEventStore.cs ✨ Enhancement +13/-4

Convert in-memory store to async enumerable

src/Testing/src/Eventuous.Testing/InMemoryEventStore.cs


14. src/Extensions/test/Eventuous.Tests.DependencyInjection/Sut/FakeStore.cs 🧪 Tests +2/-2

Update fake store test implementation

src/Extensions/test/Eventuous.Tests.DependencyInjection/Sut/FakeStore.cs


Grey Divider

Qodo Logo

@qodo-free-for-open-source-projects
Copy link
Contributor

qodo-free-for-open-source-projects bot commented Mar 11, 2026

Code Review by Qodo

🐞 Bugs (4) 📘 Rule violations (0) 📎 Requirement gaps (0)

Grey Divider


Action required

1. Tracing drops read errors🐞 Bug ✓ Correctness
Description
BaseTracer.TraceEnumerable doesn’t catch exceptions thrown while enumerating the source, so
ActivityStatus.Error and measure.SetError() are never recorded on failures. Traced reads that throw
will emit spans/metrics without error information.
Code

src/Core/src/Eventuous.Persistence/Diagnostics/Tracing/BaseTracer.cs[R51-65]

+    protected async IAsyncEnumerable<T> TraceEnumerable<T>(
+            StreamName             stream,
+            string                 operation,
+            IAsyncEnumerable<T>    source,
+            [EnumeratorCancellation] CancellationToken cancellationToken = default
+        ) {
+        using var activity = StartActivity(stream, operation);
+        using var measure  = Measure.Start(MetricsSource, new PersistenceMetricsContext(ComponentName, operation));
+
+        await foreach (var item in source.WithCancellation(cancellationToken).ConfigureAwait(false)) {
+            yield return item;
+        }
+
+        activity?.SetActivityStatus(ActivityStatus.Ok());
+    }
Evidence
Trace/Trace<T> wrap execution in try/catch and explicitly mark error status + metrics on exceptions,
but the new TraceEnumerable has no such handling, so failures during enumeration won’t be marked as
errors.

src/Core/src/Eventuous.Persistence/Diagnostics/Tracing/BaseTracer.cs[19-33]
src/Core/src/Eventuous.Persistence/Diagnostics/Tracing/BaseTracer.cs[51-65]

Agent prompt
The issue below was found during a code review. Follow the provided context and guidance below and implement a solution

## Issue description
`BaseTracer.TraceEnumerable` doesn’t mark tracing/metrics as failed when the wrapped async enumeration throws, because it lacks the try/catch used by `Trace` and `Trace&amp;amp;lt;T&amp;amp;gt;`.
### Issue Context
This impacts traced `IEventReader` operations: exceptions during `await foreach` currently propagate without `ActivityStatus.Error` and without `measure.SetError()`.
### Fix Focus Areas
- src/Core/src/Eventuous.Persistence/Diagnostics/Tracing/BaseTracer.cs[51-65]

ⓘ Copy this prompt and use it to remediate the issue with your preferred AI generation tools


2. Elastic throws on empty page🐞 Bug ✓ Correctness
Description
ElasticEventStore.ReadEvents/ReadEventsBackwards throw StreamNotFound when the query returns 0
events, which can also mean “no more events in this range.” This breaks StoreFunctions.ReadStream
paging by throwing on the final empty page for streams whose length is an exact multiple of the page
size.
Code

src/Experimental/src/Eventuous.ElasticSearch/Store/ElasticEventStore.cs[R40-44]

+    public async IAsyncEnumerable<StreamEvent> ReadEvents(StreamName stream, StreamReadPosition start, int count, [EnumeratorCancellation] CancellationToken cancellationToken) {
+        var response = await ReadEventsInternal(
          q => q.Bool(
              b => b.Must(
                  mu => mu.Term(x => x.Stream, stream.ToString()),
Evidence
ReadStream relies on an empty terminal page to stop paging; ElasticEventStore converts that empty
page into StreamNotFound unconditionally. That means a full-stream read can fail even though the
stream exists, depending on stream length and page size.

src/Core/src/Eventuous.Persistence/EventStore/StoreFunctions.cs[183-203]
src/Experimental/src/Eventuous.ElasticSearch/Store/ElasticEventStore.cs[40-55]
src/Experimental/src/Eventuous.ElasticSearch/Store/ElasticEventStore.cs[57-72]

Agent prompt
The issue below was found during a code review. Follow the provided context and guidance below and implement a solution

## Issue description
`ElasticEventStore.ReadEvents` and `ReadEventsBackwards` treat an empty result as `StreamNotFound`, which breaks paging logic (e.g., `StoreFunctions.ReadStream`) that expects empty pages at end-of-stream.
### Issue Context
`ElasticEventStore` already has `StreamExists`, which can be used to disambiguate empty-range vs missing-stream.
### Fix Focus Areas
- src/Experimental/src/Eventuous.ElasticSearch/Store/ElasticEventStore.cs[40-72]
- src/Core/src/Eventuous.Persistence/EventStore/StoreFunctions.cs[183-203]

ⓘ Copy this prompt and use it to remediate the issue with your preferred AI generation tools


3. Tiered hot-empty crash🐞 Bug ✓ Correctness
Description
TieredEventReader.ReadEvents and ReadEventsBackwards dereference hotEvents[0] even when
hotEvents.Length == 0, causing IndexOutOfRangeException. This can happen when the hot tier has no
events for the requested range (including archived-only streams).
Code

src/Core/src/Eventuous.Persistence/EventStore/TieredEventReader.cs[R18-27]

+        var hotEvents = await LoadStreamEvents(hotReader, start, count).NoContext();
      var archivedEvents = hotEvents.Length == 0 || hotEvents[0].Revision > start.Value
-            ? await LoadStreamEvents(archiveReader, start, (int)hotEvents[0].Revision, !failIfNotFound).NoContext()
+            ? (await LoadStreamEvents(archiveReader, start, (int)hotEvents[0].Revision).NoContext())
+                .Select(x => x with { FromArchive = true })
          : Enumerable.Empty<StreamEvent>();
-        return archivedEvents.Select(x => x with { FromArchive = true }).Concat(hotEvents).Distinct(Comparer).ToArray();
+        foreach (var evt in archivedEvents.Concat(hotEvents).Distinct(Comparer)) {
+            yield return evt;
+        }
Evidence
The condition explicitly allows hotEvents.Length == 0, but the true-branch immediately uses
hotEvents[0].Revision to compute the archive read parameters, which is invalid for an empty array.

src/Core/src/Eventuous.Persistence/EventStore/TieredEventReader.cs[17-27]
src/Core/src/Eventuous.Persistence/EventStore/TieredEventReader.cs[40-50]

Agent prompt
The issue below was found during a code review. Follow the provided context and guidance below and implement a solution

## Issue description
`TieredEventReader` crashes with `IndexOutOfRangeException` when the hot tier returns zero events, because it still reads `hotEvents[0].Revision` in the archive-read branch.
### Issue Context
This can occur when the stream is fully archived or when the requested range has no hot events.
### Fix Focus Areas
- src/Core/src/Eventuous.Persistence/EventStore/TieredEventReader.cs[17-27]
- src/Core/src/Eventuous.Persistence/EventStore/TieredEventReader.cs[40-50]

ⓘ Copy this prompt and use it to remediate the issue with your preferred AI generation tools


View more (1)
4. Tiered hides missing stream🐞 Bug ✓ Correctness
Description
TieredEventReader swallows StreamNotFound from underlying readers and returns an empty sequence,
even if the stream is missing from both hot and archive tiers. This violates IEventReader’s
documented contract to throw StreamNotFound for non-existent streams.
Code

src/Core/src/Eventuous.Persistence/EventStore/TieredEventReader.cs[R29-34]

+        async Task<StreamEvent[]> LoadStreamEvents(IEventReader reader, StreamReadPosition startPosition, int localCount) {
          try {
-                return await reader.ReadEvents(streamName, startPosition, localCount, !ignore, cancellationToken).NoContext();
+                return await reader.ReadEvents(streamName, startPosition, localCount, true, cancellationToken).NoContext();
          } catch (StreamNotFound) {
              return [];
          }
Evidence
The new IEventReader contract explicitly states missing streams must throw StreamNotFound, but
TieredEventReader converts StreamNotFound into an empty result inside LoadStreamEvents; if both
tiers throw, the overall result becomes empty rather than an exception.

src/Core/src/Eventuous.Persistence/EventStore/IEventReader.cs[6-23]
src/Core/src/Eventuous.Persistence/EventStore/TieredEventReader.cs[29-35]
src/Core/src/Eventuous.Persistence/EventStore/TieredEventReader.cs[52-58]

Agent prompt
The issue below was found during a code review. Follow the provided context and guidance below and implement a solution

## Issue description
`TieredEventReader` converts `StreamNotFound` into an empty result, which violates the `IEventReader` contract that missing streams throw `StreamNotFound`.
### Issue Context
A tiered reader can still treat “missing in hot” as “try archive”, but it should throw if the stream is missing from both tiers.
### Fix Focus Areas
- src/Core/src/Eventuous.Persistence/EventStore/TieredEventReader.cs[29-35]
- src/Core/src/Eventuous.Persistence/EventStore/TieredEventReader.cs[52-58]
- src/Core/src/Eventuous.Persistence/EventStore/IEventReader.cs[6-31]

ⓘ Copy this prompt and use it to remediate the issue with your preferred AI generation tools


Grey Divider

ⓘ The new review experience is currently in Beta. Learn more

Grey Divider

Qodo Logo

Copy link

@chatgpt-codex-connector chatgpt-codex-connector bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: 851a6d7282

ℹ️ About Codex in GitHub

Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".

Comment on lines +60 to +64
await foreach (var item in source.WithCancellation(cancellationToken).ConfigureAwait(false)) {
yield return item;
}

activity?.SetActivityStatus(ActivityStatus.Ok());

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Mark traced read failures in async enumeration path

TraceEnumerable never wraps the await foreach in a try/catch, so when the underlying reader throws during enumeration (for example StreamNotFound or deserialization/read exceptions), telemetry does not call measure.SetError() or set ActivityStatus.Error, unlike the existing Trace methods. This regresses observability for failed reads because failed operations can appear successful/unknown in traces and metrics.

Useful? React with 👍 / 👎.

@github-actions
Copy link

github-actions bot commented Mar 11, 2026

Test Results

   60 files  + 40     60 suites  +40   38m 51s ⏱️ + 27m 3s
  344 tests + 10    344 ✅ + 10  0 💤 ±0  0 ❌ ±0 
1 035 runs  +690  1 035 ✅ +690  0 💤 ±0  0 ❌ ±0 

Results for commit 5df61c1. ± Comparison against base commit 49e0463.

This pull request removes 5 and adds 15 tests. Note that renamed tests count towards both.
Eventuous.Tests.Azure.ServiceBus.IsSerialisableByServiceBus ‑ Passes(091f54a9-a89c-4f4f-9846-b3d3731c122f)
Eventuous.Tests.Azure.ServiceBus.IsSerialisableByServiceBus ‑ Passes(3/11/2026 10:09:22 AM +00:00)
Eventuous.Tests.Azure.ServiceBus.IsSerialisableByServiceBus ‑ Passes(3/11/2026 10:09:22 AM)
Eventuous.Tests.Subscriptions.SequenceTests ‑ ShouldReturnFirstBefore(CommitPosition { Position: 0, Sequence: 1, Timestamp: 2026-03-11T10:09:23.2864743+00:00 }, CommitPosition { Position: 0, Sequence: 2, Timestamp: 2026-03-11T10:09:23.2864743+00:00 }, CommitPosition { Position: 0, Sequence: 4, Timestamp: 2026-03-11T10:09:23.2864743+00:00 }, CommitPosition { Position: 0, Sequence: 6, Timestamp: 2026-03-11T10:09:23.2864743+00:00 }, CommitPosition { Position: 0, Sequence: 2, Timestamp: 2026-03-11T10:09:23.2864743+00:00 })
Eventuous.Tests.Subscriptions.SequenceTests ‑ ShouldReturnFirstBefore(CommitPosition { Position: 0, Sequence: 1, Timestamp: 2026-03-11T10:09:23.2864743+00:00 }, CommitPosition { Position: 0, Sequence: 2, Timestamp: 2026-03-11T10:09:23.2864743+00:00 }, CommitPosition { Position: 0, Sequence: 6, Timestamp: 2026-03-11T10:09:23.2864743+00:00 }, CommitPosition { Position: 0, Sequence: 8, Timestamp: 2026-03-11T10:09:23.2864743+00:00 }, CommitPosition { Position: 0, Sequence: 2, Timestamp: 2026-03-11T10:09:23.2864743+00:00 })
Eventuous.Tests.Azure.ServiceBus.IsSerialisableByServiceBus ‑ Passes(3/11/2026 11:11:44 AM +00:00)
Eventuous.Tests.Azure.ServiceBus.IsSerialisableByServiceBus ‑ Passes(3/11/2026 11:11:44 AM)
Eventuous.Tests.Azure.ServiceBus.IsSerialisableByServiceBus ‑ Passes(3/11/2026 11:11:47 AM +00:00)
Eventuous.Tests.Azure.ServiceBus.IsSerialisableByServiceBus ‑ Passes(3/11/2026 11:11:47 AM)
Eventuous.Tests.Azure.ServiceBus.IsSerialisableByServiceBus ‑ Passes(3/11/2026 11:12:12 AM +00:00)
Eventuous.Tests.Azure.ServiceBus.IsSerialisableByServiceBus ‑ Passes(3/11/2026 11:12:12 AM)
Eventuous.Tests.Azure.ServiceBus.IsSerialisableByServiceBus ‑ Passes(41c2f491-be9f-4dec-9bde-93ed52b98c25)
Eventuous.Tests.Azure.ServiceBus.IsSerialisableByServiceBus ‑ Passes(5c1e837f-a812-4278-b9fb-d9f8a7565c06)
Eventuous.Tests.Azure.ServiceBus.IsSerialisableByServiceBus ‑ Passes(cf4a99ff-3050-43b2-957e-658d192d08d0)
Eventuous.Tests.Subscriptions.SequenceTests ‑ ShouldReturnFirstBefore(CommitPosition { Position: 0, Sequence: 1, Timestamp: 2026-03-11T11:11:45.0624906+00:00 }, CommitPosition { Position: 0, Sequence: 2, Timestamp: 2026-03-11T11:11:45.0624906+00:00 }, CommitPosition { Position: 0, Sequence: 4, Timestamp: 2026-03-11T11:11:45.0624906+00:00 }, CommitPosition { Position: 0, Sequence: 6, Timestamp: 2026-03-11T11:11:45.0624906+00:00 }, CommitPosition { Position: 0, Sequence: 2, Timestamp: 2026-03-11T11:11:45.0624906+00:00 })
…

♻️ This comment has been updated with latest results.

alexeyzimarev and others added 2 commits March 11, 2026 11:38
Use manual enumerator with try-catch around MoveNextAsync to properly
set ActivityStatus.Error and measure.SetError() when the underlying
async enumeration throws, matching the behavior of the existing Trace methods.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
TieredEventReader:
- Fix IndexOutOfRangeException when hot store returns no events by
  checking hotEvents.Length before accessing hotEvents[0]
- Throw StreamNotFound when neither hot nor archive tier has events,
  instead of silently returning an empty sequence

ElasticEventStore:
- Use StreamExists to distinguish "no events in range" from "stream
  does not exist", fixing false StreamNotFound on final empty page
  when stream length is an exact multiple of the page size

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
@alexeyzimarev alexeyzimarev merged commit 7bc0114 into dev Mar 11, 2026
6 checks passed
@alexeyzimarev alexeyzimarev deleted the feature/ieventreader-async-enumerable branch March 11, 2026 11:34
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant