Skip to content

feat: add multi-stream produce to IProducer#512

Merged
alexeyzimarev merged 8 commits intodevfrom
feat/multi-stream-produce
Mar 9, 2026
Merged

feat: add multi-stream produce to IProducer#512
alexeyzimarev merged 8 commits intodevfrom
feat/multi-stream-produce

Conversation

@alexeyzimarev
Copy link
Contributor

Summary

  • Add ProduceRequest / ProduceRequest<TProduceOptions> record structs as input types for multi-stream produce
  • Add default interface method Produce(IReadOnlyCollection<ProduceRequest>) on IProducer with parallel execution via Task.WhenAll
  • Add Produce(IReadOnlyCollection<ProduceRequest<T>>) on BaseProducer<T> (can't go on IProducer<in T> due to variance constraints)
  • Simplify GatewayHandler to use single multi-stream Produce call instead of GroupBy + parallel per-stream calls
  • Update GatewayProducer to delegate multi-stream produce to inner producer

Design notes

The generic ProduceRequest<TProduceOptions> overload cannot live on IProducer<in TProduceOptions> because the contravariant (in) type parameter causes a CS1961 variance error when used in method parameters. The overload lives on BaseProducer<T> instead, which all concrete producers inherit from.

Individual producer implementations (Kafka, RabbitMQ, PubSub, Service Bus) inherit the default parallel behavior and can override later for optimization.

Test plan

  • 4 new tests in MultiStreamProduceTests: multi-stream, empty requests, same-stream batch, untyped requests
  • Full solution builds with 0 errors
  • Gateway tests: 10/10 passing
  • Core tests: 26/26 passing

🤖 Generated with Claude Code

alexeyzimarev and others added 5 commits March 9, 2026 16:41
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Add ProduceRequest record structs and multi-stream Produce overloads
that delegate to existing single-stream methods with parallel execution.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Add multi-stream delegation methods to GatewayProducer (both generic
ProduceRequest<T> and non-generic ProduceRequest overloads). Update
GatewayHandler to batch messages by target stream into ProduceRequest
collections instead of producing per-message, reducing overhead.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Tests validate producing to multiple streams, empty requests,
multiple messages to same stream, and untyped produce requests.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
@qodo-free-for-open-source-projects
Copy link
Contributor

Review Summary by Qodo

Add multi-stream produce support to IProducer and update Gateway

✨ Enhancement

Grey Divider

Walkthroughs

Description
• Add ProduceRequest and ProduceRequest<T> record structs for batch multi-stream operations
• Implement multi-stream Produce overloads on IProducer and BaseProducer<T> with parallel
  execution
• Simplify GatewayHandler to batch messages by stream and use single multi-stream produce call
• Add delegation methods to GatewayProducer for multi-stream produce support
• Include comprehensive tests validating multi-stream, empty requests, and same-stream batching
Diagram
flowchart LR
  A["ProduceRequest<br/>Record Structs"] -->|"Define input types"| B["IProducer<br/>Multi-stream Methods"]
  B -->|"Implement with<br/>parallel execution"| C["BaseProducer<T><br/>Override Methods"]
  C -->|"Delegate to"| D["Single-stream<br/>Produce"]
  E["GatewayHandler<br/>GroupBy Messages"] -->|"Refactor to use"| F["Multi-stream<br/>Produce Call"]
  G["GatewayProducer<br/>Delegation Methods"] -->|"Support"| F
  H["MultiStreamProduceTests<br/>Test Suite"] -->|"Validate"| C
Loading

Grey Divider

File Changes

1. src/Core/src/Eventuous.Producers/ProduceRequest.cs ✨ Enhancement +13/-0

Add ProduceRequest record structs

• Create new file with two record struct types for multi-stream produce requests
• ProduceRequest: non-generic struct with Stream and Messages properties
• ProduceRequest<TProduceOptions>: generic struct adding Options property
• Use StructLayout(LayoutKind.Auto) for memory optimization

src/Core/src/Eventuous.Producers/ProduceRequest.cs


2. src/Core/src/Eventuous.Producers/IProducer.cs ✨ Enhancement +11/-0

Add multi-stream Produce to IProducer

• Add default interface method Produce(IReadOnlyCollection<ProduceRequest>) to IProducer
• Implement parallel execution via Task.WhenAll delegating to single-stream Produce
• Add XML documentation describing multi-stream parallel behavior
• Include required RequiresDynamicCode and RequiresUnreferencedCode attributes

src/Core/src/Eventuous.Producers/IProducer.cs


3. src/Core/src/Eventuous.Producers/BaseProducer.cs ✨ Enhancement +18/-0

Implement multi-stream Produce in BaseProducer

• Add override for Produce(IReadOnlyCollection<ProduceRequest<TProduceOptions>>) with empty check
• Add override for non-generic Produce(IReadOnlyCollection<ProduceRequest>) variant
• Both methods delegate to existing single-stream Produce via Task.WhenAll
• Include required serialization attributes for AOT compatibility

src/Core/src/Eventuous.Producers/BaseProducer.cs


View more (5)
4. src/Gateway/src/Eventuous.Gateway/GatewayProducer.cs ✨ Enhancement +16/-0

Add multi-stream delegation to GatewayProducer

• Add Produce(IReadOnlyCollection<ProduceRequest<T>>) method with readiness check
• Add Produce(IReadOnlyCollection<ProduceRequest>) non-generic overload
• Both methods wait for inner producer readiness before delegating
• Support both BaseProducer<T> and generic IProducer<T> implementations

src/Gateway/src/Eventuous.Gateway/GatewayProducer.cs


5. src/Gateway/src/Eventuous.Gateway/GatewayHandler.cs ✨ Enhancement +14/-17

Simplify GatewayHandler with multi-stream produce

• Replace GroupBy + per-stream ProduceToStream calls with single multi-stream Produce call
• Construct ProduceRequest<TProduceOptions> array from grouped messages
• Eliminate ProduceToStream local function reducing code complexity
• Batch message transformation and metadata application before produce

src/Gateway/src/Eventuous.Gateway/GatewayHandler.cs


6. src/Gateway/test/Eventuous.Tests.Gateway/MultiStreamProduceTests.cs 🧪 Tests +90/-0

Add comprehensive multi-stream produce tests

• Add test for producing to multiple distinct streams in parallel
• Add test for handling empty request collections
• Add test for batching multiple messages to same stream
• Add test for untyped ProduceRequest (non-generic) variant
• Include TestProducer implementation extending BaseProducer<T>

src/Gateway/test/Eventuous.Tests.Gateway/MultiStreamProduceTests.cs


7. docs/plans/2026-03-09-multi-stream-produce-design.md 📝 Documentation +64/-0

Add multi-stream produce design documentation

• Document design goals and new ProduceRequest type definitions
• Explain interface changes to IProducer and IProducer<TProduceOptions>
• Describe BaseProducer override strategy with tracing
• Outline Gateway update approach and testing strategy

docs/plans/2026-03-09-multi-stream-produce-design.md


8. docs/plans/2026-03-09-multi-stream-produce.md 📝 Documentation +339/-0

Add multi-stream produce implementation plan

• Provide detailed step-by-step implementation plan with 7 tasks
• Include code snippets for each task with expected outcomes
• Document architectural decisions and variance constraint workarounds
• Specify build and test verification steps for each phase

docs/plans/2026-03-09-multi-stream-produce.md


Grey Divider

Qodo Logo

@qodo-free-for-open-source-projects
Copy link
Contributor

qodo-free-for-open-source-projects bot commented Mar 9, 2026

Code Review by Qodo

🐞 Bugs (2) 📘 Rule violations (1) 📎 Requirement gaps (0)

Grey Divider


Action required

1. Missing newline in BaseProducer.cs 📘 Rule violation ✓ Correctness
Description
The modified BaseProducer.cs ends without a trailing newline, which introduces formatting that
conflicts with typical .editorconfig end-of-file rules. This can cause avoidable diff noise and
style enforcement failures in CI/editor tooling.
Code

src/Core/src/Eventuous.Producers/BaseProducer.cs[75]

}
Evidence
The compliance checklist requires adhering to repository .editorconfig formatting rules. The PR
modifies BaseProducer.cs and the diff indicates the file now has no newline at end-of-file (EOF),
which is a formatting violation.

CLAUDE.md
src/Core/src/Eventuous.Producers/BaseProducer.cs[58-75]

Agent prompt
The issue below was found during a code review. Follow the provided context and guidance below and implement a solution

## Issue description
`src/Core/src/Eventuous.Producers/BaseProducer.cs` was modified in this PR and now ends without a trailing newline (EOF newline). This conflicts with repository formatting rules and can cause formatting checks/diff noise.
## Issue Context
Git reports `\ No newline at end of file` for the updated file; add a final newline after the last `}`.
## Fix Focus Areas
- src/Core/src/Eventuous.Producers/BaseProducer.cs[75-75]

ⓘ Copy this prompt and use it to remediate the issue with your preferred AI generation tools


2. Hosted producer wait skipped🐞 Bug ⛯ Reliability
Description
GatewayProducer never actually waits for an inner IHostedProducer to become Ready because the guard
condition is inverted, so produces can run before the hosted producer is initialized. This affects
the new multi-stream Produce overloads as well, not just single-stream produces.
Code

src/Gateway/src/Eventuous.Gateway/GatewayProducer.cs[R15-17]

+    public async Task Produce(IReadOnlyCollection<ProduceRequest<T>> requests, CancellationToken cancellationToken = default) {
+        if (_isHostedService) { await WaitForInner(inner, cancellationToken).NoContext(); }
+
Evidence
_isHostedService is set to true when inner is NOT an IHostedProducer, but the code only calls
WaitForInner when that flag is true; WaitForInner itself only waits when inner IS an
IHostedProducer, so the wait loop is never reached for hosted producers.

src/Gateway/src/Eventuous.Gateway/GatewayProducer.cs[6-17]
src/Gateway/src/Eventuous.Gateway/GatewayProducer.cs[31-38]

Agent prompt
The issue below was found during a code review. Follow the provided context and guidance below and implement a solution

## Issue description
`GatewayProducer&amp;amp;lt;T&amp;amp;gt;` uses an inverted `_isHostedService` flag, causing it to skip waiting for `IHostedProducer.Ready` before producing. This makes produce calls (including the new multi-stream overloads) race the hosted producer initialization.
### Issue Context
`WaitForInner` already checks `inner is not IHostedProducer` and returns immediately, so you can safely call it unconditionally, or fix the boolean to represent the correct condition.
### Fix Focus Areas
- src/Gateway/src/Eventuous.Gateway/GatewayProducer.cs[6-29]
- src/Gateway/src/Eventuous.Gateway/GatewayProducer.cs[31-38]

ⓘ Copy this prompt and use it to remediate the issue with your preferred AI generation tools



Remediation recommended

3. Stream options silently collapsed🐞 Bug ✓ Correctness
Description
GatewayHandler groups messages by TargetStream and uses only g.First().ProduceOptions for the entire
stream batch, so any subsequent message in the same stream group with different ProduceOptions will
be produced using the wrong options. This is a behavioral change from the prior per-message Produce
calls (explicitly noted in the implementation plan).
Code

src/Gateway/src/Eventuous.Gateway/GatewayHandler.cs[R40-46]

+            var requests = shovelMessages
+                .GroupBy(x => x.TargetStream)
+                .Select(g => new ProduceRequest<TProduceOptions>(
+                    g.Key,
+                    g.Select(x => new ProducedMessage(x.Message, x.GetMeta(context), contextMeta) { OnAck = onAck, OnNack = onFail }),
+                    g.First().ProduceOptions
+                ))
Evidence
The new GatewayHandler implementation creates one ProduceRequest per stream and chooses the first
message’s ProduceOptions for that request, even though GatewayMessage carries ProduceOptions per
message; the plan also states the old code produced per message with its own options, so this is an
intentional but breaking semantic change if options differ within a stream.

src/Gateway/src/Eventuous.Gateway/GatewayHandler.cs[40-46]
src/Gateway/src/Eventuous.Gateway/GatewayMessage.cs[6-6]
docs/plans/2026-03-09-multi-stream-produce.md[210-213]

Agent prompt
The issue below was found during a code review. Follow the provided context and guidance below and implement a solution

## Issue description
`GatewayHandler` batches by `TargetStream` but assigns `g.First().ProduceOptions` to the whole batch. If a transform emits multiple messages to the same stream with different options, the later messages will be produced with incorrect options.
### Issue Context
`GatewayMessage&amp;amp;lt;TProduceOptions&amp;amp;gt;` carries `ProduceOptions` per message and the implementation plan explicitly notes the old code produced per message with its own options.
### Fix Focus Areas
- src/Gateway/src/Eventuous.Gateway/GatewayHandler.cs[40-47]
- src/Gateway/src/Eventuous.Gateway/GatewayMessage.cs[6-6]

ⓘ Copy this prompt and use it to remediate the issue with your preferred AI generation tools


Grey Divider

ⓘ The new review experience is currently in Beta. Learn more

Grey Divider

Qodo Logo

@cloudflare-workers-and-pages
Copy link

Deploying eventuous-main with  Cloudflare Pages  Cloudflare Pages

Latest commit: 7d06e43
Status: ✅  Deploy successful!
Preview URL: https://3b7f936c.eventuous-main.pages.dev
Branch Preview URL: https://feat-multi-stream-produce.eventuous-main.pages.dev

View logs

Copy link

@chatgpt-codex-connector chatgpt-codex-connector bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: 7d06e434ba

ℹ️ About Codex in GitHub

Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".

.Select(g => new ProduceRequest<TProduceOptions>(
g.Key,
g.Select(x => new ProducedMessage(x.Message, x.GetMeta(context), contextMeta) { OnAck = onAck, OnNack = onFail }),
g.First().ProduceOptions

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Preserve per-message produce options in grouped requests

When multiple GatewayMessages target the same stream but carry different ProduceOptions, this batching logic collapses them to g.First().ProduceOptions, so every message in that stream is produced with the first message's options. Before this change each message called producer.Produce(..., x.ProduceOptions, ...) independently, so options could vary per message; now routing keys/partitions or other producer-specific options can be silently misapplied whenever mixed options appear in one stream group.

Useful? React with 👍 / 👎.


return Task.WhenAll(requests.Select(r => Produce(r.Stream, r.Messages, cancellationToken)));
}
} No newline at end of file

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Action required

1. Missing newline in baseproducer.cs 📘 Rule violation ✓ Correctness

The modified BaseProducer.cs ends without a trailing newline, which introduces formatting that
conflicts with typical .editorconfig end-of-file rules. This can cause avoidable diff noise and
style enforcement failures in CI/editor tooling.
Agent Prompt
## Issue description
`src/Core/src/Eventuous.Producers/BaseProducer.cs` was modified in this PR and now ends without a trailing newline (EOF newline). This conflicts with repository formatting rules and can cause formatting checks/diff noise.

## Issue Context
Git reports `\ No newline at end of file` for the updated file; add a final newline after the last `}`.

## Fix Focus Areas
- src/Core/src/Eventuous.Producers/BaseProducer.cs[75-75]

ⓘ Copy this prompt and use it to remediate the issue with your preferred AI generation tools

@github-actions
Copy link

github-actions bot commented Mar 9, 2026

Test Results

   60 files  + 40     60 suites  +40   37m 45s ⏱️ + 25m 30s
  344 tests + 14    344 ✅ + 14  0 💤 ±0  0 ❌ ±0 
1 035 runs  +694  1 035 ✅ +694  0 💤 ±0  0 ❌ ±0 

Results for commit cdf283c. ± Comparison against base commit 843d9ba.

This pull request removes 5 and adds 19 tests. Note that renamed tests count towards both.
Eventuous.Tests.Azure.ServiceBus.IsSerialisableByServiceBus ‑ Passes(3/9/2026 3:28:15 PM +00:00)
Eventuous.Tests.Azure.ServiceBus.IsSerialisableByServiceBus ‑ Passes(3/9/2026 3:28:15 PM)
Eventuous.Tests.Azure.ServiceBus.IsSerialisableByServiceBus ‑ Passes(fe4cf9ab-9c43-4952-aa51-88d2b803d746)
Eventuous.Tests.Subscriptions.SequenceTests ‑ ShouldReturnFirstBefore(CommitPosition { Position: 0, Sequence: 1, Timestamp: 2026-03-09T15:28:15.7005716+00:00 }, CommitPosition { Position: 0, Sequence: 2, Timestamp: 2026-03-09T15:28:15.7005716+00:00 }, CommitPosition { Position: 0, Sequence: 4, Timestamp: 2026-03-09T15:28:15.7005716+00:00 }, CommitPosition { Position: 0, Sequence: 6, Timestamp: 2026-03-09T15:28:15.7005716+00:00 }, CommitPosition { Position: 0, Sequence: 2, Timestamp: 2026-03-09T15:28:15.7005716+00:00 })
Eventuous.Tests.Subscriptions.SequenceTests ‑ ShouldReturnFirstBefore(CommitPosition { Position: 0, Sequence: 1, Timestamp: 2026-03-09T15:28:15.7005716+00:00 }, CommitPosition { Position: 0, Sequence: 2, Timestamp: 2026-03-09T15:28:15.7005716+00:00 }, CommitPosition { Position: 0, Sequence: 6, Timestamp: 2026-03-09T15:28:15.7005716+00:00 }, CommitPosition { Position: 0, Sequence: 8, Timestamp: 2026-03-09T15:28:15.7005716+00:00 }, CommitPosition { Position: 0, Sequence: 2, Timestamp: 2026-03-09T15:28:15.7005716+00:00 })
Eventuous.Tests.Azure.ServiceBus.IsSerialisableByServiceBus ‑ Passes(3/9/2026 4:48:44 PM +00:00)
Eventuous.Tests.Azure.ServiceBus.IsSerialisableByServiceBus ‑ Passes(3/9/2026 4:48:44 PM)
Eventuous.Tests.Azure.ServiceBus.IsSerialisableByServiceBus ‑ Passes(3/9/2026 4:48:45 PM +00:00)
Eventuous.Tests.Azure.ServiceBus.IsSerialisableByServiceBus ‑ Passes(3/9/2026 4:48:45 PM)
Eventuous.Tests.Azure.ServiceBus.IsSerialisableByServiceBus ‑ Passes(3/9/2026 4:48:59 PM +00:00)
Eventuous.Tests.Azure.ServiceBus.IsSerialisableByServiceBus ‑ Passes(3/9/2026 4:48:59 PM)
Eventuous.Tests.Azure.ServiceBus.IsSerialisableByServiceBus ‑ Passes(326ea4c8-9bb9-4fb6-a7c3-72ce162321e1)
Eventuous.Tests.Azure.ServiceBus.IsSerialisableByServiceBus ‑ Passes(6d68925b-e732-410c-b5a5-5701426aab37)
Eventuous.Tests.Azure.ServiceBus.IsSerialisableByServiceBus ‑ Passes(d12e383d-6205-4f39-a3f9-112a2a83c95c)
Eventuous.Tests.Gateway.MultiStreamProduceTests ‑ ShouldHandleEmptyRequests
…

♻️ This comment has been updated with latest results.

alexeyzimarev and others added 3 commits March 9, 2026 17:26
- Add missing trailing newline in BaseProducer.cs
- Group by (Stream, ProduceOptions) instead of just Stream in
  GatewayHandler to preserve per-message options when messages to the
  same stream have different ProduceOptions

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
WaitForInner already guards with `if (inner is not IHostedProducer) return`,
so call it unconditionally. The previous boolean was inverted (true when NOT
hosted), meaning the wait was never reached for actual hosted producers.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
The emulator container readiness check was hitting the default 100-second
HttpClient.Timeout on CI runners, causing all Service Bus tests to fail.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
@alexeyzimarev alexeyzimarev merged commit 71036e9 into dev Mar 9, 2026
6 checks passed
@alexeyzimarev alexeyzimarev deleted the feat/multi-stream-produce branch March 9, 2026 18:29
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant