diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py index 7c8b1cb2f..6a1ebc7cb 100644 --- a/kafka/consumer/group.py +++ b/kafka/consumer/group.py @@ -717,8 +717,21 @@ def _poll_once(self, timer, max_records, update_offsets=True): Returns: dict: Map of topic to list of records (may be empty). """ + # Return any already-buffered records before doing coordinator work. + # This fixes poll(timeout_ms=0) returning empty when records are already + # available, and avoids returning records that might be stale if the + # coordinator signals a rebalance later in this same poll cycle. + records, partial = self._fetcher.fetched_records(max_records, update_offsets=update_offsets) + if records: + log.debug('poll: returning buffered records before coordinator poll') + if not partial: + log.debug("poll: Sending fetches") + futures = self._fetcher.send_fetches() + if len(futures): + self._client.poll(timeout_ms=0) + return records + if not self._coordinator.poll(timeout_ms=timer.timeout_ms): - log.debug('poll: timeout during coordinator.poll(); returning early') return {} has_all_fetch_positions = self._update_fetch_positions(timeout_ms=timer.timeout_ms) diff --git a/test/test_consumer.py b/test/test_consumer.py index 60d101d69..1c2c9d348 100644 --- a/test/test_consumer.py +++ b/test/test_consumer.py @@ -1,3 +1,4 @@ +from unittest.mock import patch, MagicMock import pytest from kafka import KafkaConsumer, TopicPartition @@ -48,3 +49,50 @@ def test_assign(): consumer.subscribe(topics=['foo']) consumer.assign([]) assert consumer.assignment() == set() + + +def test_poll_timeout_zero_returns_buffered_records(): + """Test that poll(timeout_ms=0) returns records already in the fetch buffer. + + Regression test for https://github.com/dpkp/kafka-python/issues/2692 + Buffered records are checked before coordinator.poll(), so they are + returned regardless of coordinator state (rebalance, timeout, etc.). + """ + consumer = KafkaConsumer(api_version=(0, 10, 0)) + tp = TopicPartition('test-topic', 0) + consumer.assign([tp]) + + mock_records = {tp: [MagicMock()]} + + # Fetcher already has records buffered from a previous network poll. + # coordinator.poll() should never be reached in this path. + with patch.object(consumer._fetcher, 'fetched_records', return_value=(mock_records, False)), \ + patch.object(consumer._fetcher, 'send_fetches', return_value=[]), \ + patch.object(consumer._client, 'poll'), \ + patch.object(consumer._coordinator, 'poll', return_value=False) as mock_coord_poll: + + result = consumer.poll(timeout_ms=0) + assert result == mock_records, ( + "poll(timeout_ms=0) should return buffered records " + "without needing a successful coordinator.poll()" + ) + mock_coord_poll.assert_not_called() + + +def test_poll_timeout_zero_returns_empty_when_no_buffered_records(): + """Test that poll(timeout_ms=0) returns empty when no records are buffered. + + Ensures that the non-blocking behavior is preserved -- if there are no + records in the buffer and coordinator times out, return empty immediately. + """ + consumer = KafkaConsumer(api_version=(0, 10, 0)) + tp = TopicPartition('test-topic', 0) + consumer.assign([tp]) + + with patch.object(consumer._fetcher, 'fetched_records', return_value=({}, False)), \ + patch.object(consumer._coordinator, 'poll', return_value=False): + + result = consumer.poll(timeout_ms=0) + assert result == {}, ( + "poll(timeout_ms=0) should return empty dict when no records are buffered" + )