From f2d3f0f87b2f552b059770d5c221fc950694fec8 Mon Sep 17 00:00:00 2001 From: nkanapar Date: Mon, 2 Mar 2026 13:59:44 -0500 Subject: [PATCH 1/2] fix: poll(timeout_ms=0) returns buffered records when coordinator times out. Fixes #2692 --- kafka/consumer/group.py | 15 ++++++++++++- test/test_consumer.py | 47 +++++++++++++++++++++++++++++++++++++++++ 2 files changed, 61 insertions(+), 1 deletion(-) diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py index 7c8b1cb2f..4464f252c 100644 --- a/kafka/consumer/group.py +++ b/kafka/consumer/group.py @@ -718,7 +718,20 @@ def _poll_once(self, timer, max_records, update_offsets=True): dict: Map of topic to list of records (may be empty). """ if not self._coordinator.poll(timeout_ms=timer.timeout_ms): - log.debug('poll: timeout during coordinator.poll(); returning early') + log.debug('poll: timeout during coordinator.poll()') + # Still return any records already available in the fetch buffer. + # This is critical for timeout_ms=0 (non-blocking) usage where + # previous poll() calls may have issued fetches that have since + # completed and populated the buffer. + records, partial = self._fetcher.fetched_records(max_records, update_offsets=update_offsets) + if records: + log.debug('poll: returning %d partitions of previously fetched records after coordinator timeout', len(records)) + if not partial: + futures = self._fetcher.send_fetches() + if len(futures): + self._client.poll(timeout_ms=0) + return records + log.debug('poll: no previously fetched records; returning early') return {} has_all_fetch_positions = self._update_fetch_positions(timeout_ms=timer.timeout_ms) diff --git a/test/test_consumer.py b/test/test_consumer.py index 60d101d69..4520ca9c0 100644 --- a/test/test_consumer.py +++ b/test/test_consumer.py @@ -1,3 +1,4 @@ +from unittest.mock import patch, MagicMock import pytest from kafka import KafkaConsumer, TopicPartition @@ -48,3 +49,49 @@ def test_assign(): consumer.subscribe(topics=['foo']) consumer.assign([]) assert consumer.assignment() == set() + + +def test_poll_timeout_zero_returns_buffered_records(): + """Test that poll(timeout_ms=0) returns records already in the fetch buffer. + + Regression test for https://github.com/dpkp/kafka-python/issues/2692 + When timeout_ms=0, coordinator.poll() may time out, but we should still + check the fetcher buffer for already-fetched records before returning empty. + """ + consumer = KafkaConsumer(api_version=(0, 10, 0)) + tp = TopicPartition('test-topic', 0) + consumer.assign([tp]) + + mock_records = {tp: [MagicMock()]} + + # Simulate coordinator.poll() timing out (returns False) + # but fetcher already has records buffered from a previous network poll + with patch.object(consumer._coordinator, 'poll', return_value=False), \ + patch.object(consumer._fetcher, 'fetched_records', return_value=(mock_records, False)), \ + patch.object(consumer._fetcher, 'send_fetches', return_value=[]), \ + patch.object(consumer._client, 'poll'): + + result = consumer.poll(timeout_ms=0) + assert result == mock_records, ( + "poll(timeout_ms=0) should return buffered records " + "even when coordinator.poll() times out" + ) + + +def test_poll_timeout_zero_returns_empty_when_no_buffered_records(): + """Test that poll(timeout_ms=0) returns empty when no records are buffered. + + Ensures that the non-blocking behavior is preserved -- if there are no + records in the buffer and coordinator times out, return empty immediately. + """ + consumer = KafkaConsumer(api_version=(0, 10, 0)) + tp = TopicPartition('test-topic', 0) + consumer.assign([tp]) + + with patch.object(consumer._coordinator, 'poll', return_value=False), \ + patch.object(consumer._fetcher, 'fetched_records', return_value=({}, False)): + + result = consumer.poll(timeout_ms=0) + assert result == {}, ( + "poll(timeout_ms=0) should return empty dict when no records are buffered" + ) From bb5c9ac4921e17d91070e750b6716781d06df0f5 Mon Sep 17 00:00:00 2001 From: nkanapar Date: Sun, 8 Mar 2026 15:57:50 -0400 Subject: [PATCH 2/2] refactor: check fetch buffer before coordinator.poll() to fix poll(timeout_ms=0) Per maintainer feedback on #2718: returning buffered records after coordinator.poll() fails is unsafe because False can mean a rebalance is in progress and the consumer may no longer own those partitions. Fix by checking fetched_records() *before* coordinator.poll(). Records already in the buffer were fetched during a valid partition assignment (prior to this poll cycle), so returning them is always safe. If the buffer is empty we proceed with coordinator.poll() as before. This also means coordinator.poll() is never called when buffered records are present, which is the correct non-blocking semantic for timeout_ms=0. Fixes #2692 --- kafka/consumer/group.py | 28 ++++++++++++++-------------- test/test_consumer.py | 21 +++++++++++---------- 2 files changed, 25 insertions(+), 24 deletions(-) diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py index 4464f252c..6a1ebc7cb 100644 --- a/kafka/consumer/group.py +++ b/kafka/consumer/group.py @@ -717,21 +717,21 @@ def _poll_once(self, timer, max_records, update_offsets=True): Returns: dict: Map of topic to list of records (may be empty). """ + # Return any already-buffered records before doing coordinator work. + # This fixes poll(timeout_ms=0) returning empty when records are already + # available, and avoids returning records that might be stale if the + # coordinator signals a rebalance later in this same poll cycle. + records, partial = self._fetcher.fetched_records(max_records, update_offsets=update_offsets) + if records: + log.debug('poll: returning buffered records before coordinator poll') + if not partial: + log.debug("poll: Sending fetches") + futures = self._fetcher.send_fetches() + if len(futures): + self._client.poll(timeout_ms=0) + return records + if not self._coordinator.poll(timeout_ms=timer.timeout_ms): - log.debug('poll: timeout during coordinator.poll()') - # Still return any records already available in the fetch buffer. - # This is critical for timeout_ms=0 (non-blocking) usage where - # previous poll() calls may have issued fetches that have since - # completed and populated the buffer. - records, partial = self._fetcher.fetched_records(max_records, update_offsets=update_offsets) - if records: - log.debug('poll: returning %d partitions of previously fetched records after coordinator timeout', len(records)) - if not partial: - futures = self._fetcher.send_fetches() - if len(futures): - self._client.poll(timeout_ms=0) - return records - log.debug('poll: no previously fetched records; returning early') return {} has_all_fetch_positions = self._update_fetch_positions(timeout_ms=timer.timeout_ms) diff --git a/test/test_consumer.py b/test/test_consumer.py index 4520ca9c0..1c2c9d348 100644 --- a/test/test_consumer.py +++ b/test/test_consumer.py @@ -55,8 +55,8 @@ def test_poll_timeout_zero_returns_buffered_records(): """Test that poll(timeout_ms=0) returns records already in the fetch buffer. Regression test for https://github.com/dpkp/kafka-python/issues/2692 - When timeout_ms=0, coordinator.poll() may time out, but we should still - check the fetcher buffer for already-fetched records before returning empty. + Buffered records are checked before coordinator.poll(), so they are + returned regardless of coordinator state (rebalance, timeout, etc.). """ consumer = KafkaConsumer(api_version=(0, 10, 0)) tp = TopicPartition('test-topic', 0) @@ -64,18 +64,19 @@ def test_poll_timeout_zero_returns_buffered_records(): mock_records = {tp: [MagicMock()]} - # Simulate coordinator.poll() timing out (returns False) - # but fetcher already has records buffered from a previous network poll - with patch.object(consumer._coordinator, 'poll', return_value=False), \ - patch.object(consumer._fetcher, 'fetched_records', return_value=(mock_records, False)), \ + # Fetcher already has records buffered from a previous network poll. + # coordinator.poll() should never be reached in this path. + with patch.object(consumer._fetcher, 'fetched_records', return_value=(mock_records, False)), \ patch.object(consumer._fetcher, 'send_fetches', return_value=[]), \ - patch.object(consumer._client, 'poll'): + patch.object(consumer._client, 'poll'), \ + patch.object(consumer._coordinator, 'poll', return_value=False) as mock_coord_poll: result = consumer.poll(timeout_ms=0) assert result == mock_records, ( "poll(timeout_ms=0) should return buffered records " - "even when coordinator.poll() times out" + "without needing a successful coordinator.poll()" ) + mock_coord_poll.assert_not_called() def test_poll_timeout_zero_returns_empty_when_no_buffered_records(): @@ -88,8 +89,8 @@ def test_poll_timeout_zero_returns_empty_when_no_buffered_records(): tp = TopicPartition('test-topic', 0) consumer.assign([tp]) - with patch.object(consumer._coordinator, 'poll', return_value=False), \ - patch.object(consumer._fetcher, 'fetched_records', return_value=({}, False)): + with patch.object(consumer._fetcher, 'fetched_records', return_value=({}, False)), \ + patch.object(consumer._coordinator, 'poll', return_value=False): result = consumer.poll(timeout_ms=0) assert result == {}, (