Skip to content

fix: poll(timeout_ms=0) returns buffered records when coordinator times out#2718

Open
HKanaparthi wants to merge 2 commits intodpkp:masterfrom
HKanaparthi:fix/poll-timeout-zero-buffered-records
Open

fix: poll(timeout_ms=0) returns buffered records when coordinator times out#2718
HKanaparthi wants to merge 2 commits intodpkp:masterfrom
HKanaparthi:fix/poll-timeout-zero-buffered-records

Conversation

@HKanaparthi
Copy link

Fixes #2692

Root cause:
When timeout_ms=0, coordinator.poll() returns False immediately, causing
an early return {} at line 722 that skipped the fetched_records() buffer
check — so buffered messages from previous poll calls were silently dropped.

Fix:
Added a buffer check inside the early-exit branch so already-fetched
records are still returned even when the coordinator times out.
Non-blocking behavior is fully preserved.

Tests:
Added 2 regression tests covering:

  • poll(timeout_ms=0) returns buffered records when coordinator times out
  • poll(timeout_ms=0) returns empty when no records are buffered

Note: Tests use mocks. Happy to add integration tests if needed.

@dpkp
Copy link
Owner

dpkp commented Mar 8, 2026

I'm still confused about this. The behavior of dropping buffered fetch requests when the coordinator check fails is expected. It may mean, for example, that the group is in rebalance and the consumer is either no longer in the group or needs to refresh partition assignment. In either case, I believe it is wrong to continue processing fetched records.

But so assume that coordinator.poll() returns False when timeout_ms=0; shouldnt the correct behavior be to retry until the coordinator is ready and the group is stable?

  poll(timeout_ms=0)

  Per maintainer feedback on dpkp#2718: returning buffered records after
  coordinator.poll() fails is unsafe because False can mean a rebalance
  is in progress and the consumer may no longer own those partitions.

  Fix by checking fetched_records() *before* coordinator.poll(). Records
  already in the buffer were fetched during a valid partition assignment
  (prior to this poll cycle), so returning them is always safe. If the
  buffer is empty we proceed with coordinator.poll() as before.

  This also means coordinator.poll() is never called when buffered records
  are present, which is the correct non-blocking semantic for timeout_ms=0.

  Fixes dpkp#2692
@HKanaparthi
Copy link
Author

Hi @dpkp , thanks for the feedback! You're right — returning records after coordinator.poll() fails is unsafe when the failure indicates a rebalance or lost partition assignment.

I've updated the fix with a different approach:
instead of checking the buffer after coordinator failure, we now check fetched_records() before calling coordinator.poll(). Records already in the buffer were fetched during a valid partition assignment (prior to this poll cycle), so returning them is always safe. If the buffer is empty, we proceed with coordinator.poll() as normal — and if that times out, we return {} as before.

This also means coordinator.poll() is never called when buffered records exist, which is the correct non-blocking semantic for timeout_ms=0.

@dpkp
Copy link
Owner

dpkp commented Mar 8, 2026

I'm still confused about the root cause and so I am not confident that these changes make sense. You said that When timeout_ms=0, coordinator.poll() returns False immediately. But that's only true if the coordinator is still unknown (we are waiting for FindCoordinatorResponse) or the group needs rejoin. In both cases isn't simple retry the solution? Eventually the coordinator is found and the group is joined and then at that point we would consume messages. Did you find something else?

@HKanaparthi
Copy link
Author

Good point — I think I over-assumed the root cause here.
That said, one edge case I had in mind: coordinator_unknown() can transiently return True during reconnect backoff (base.py:261) even when the group was previously stable. So my thinking was — records already buffered from a prior fetch, coordinator blips, and those records get silently dropped. Not sure if that's what the original reporter actually hit though.
To make sure I fix this correctly: for poll(timeout_ms=0), is the right answer simply "retry until the coordinator stabilizes and records flow naturally"? If so, I'm happy to revert the code change and just add a doc note clarifying the non-blocking retry semantics instead.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

KafkaConsumer.poll() does not poll messages with default value for timeout_ms

2 participants