Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 14 additions & 1 deletion kafka/consumer/group.py
Original file line number Diff line number Diff line change
Expand Up @@ -717,8 +717,21 @@ def _poll_once(self, timer, max_records, update_offsets=True):
Returns:
dict: Map of topic to list of records (may be empty).
"""
# Return any already-buffered records before doing coordinator work.
# This fixes poll(timeout_ms=0) returning empty when records are already
# available, and avoids returning records that might be stale if the
# coordinator signals a rebalance later in this same poll cycle.
records, partial = self._fetcher.fetched_records(max_records, update_offsets=update_offsets)
if records:
log.debug('poll: returning buffered records before coordinator poll')
if not partial:
log.debug("poll: Sending fetches")
futures = self._fetcher.send_fetches()
if len(futures):
self._client.poll(timeout_ms=0)
return records

if not self._coordinator.poll(timeout_ms=timer.timeout_ms):
log.debug('poll: timeout during coordinator.poll(); returning early')
return {}

has_all_fetch_positions = self._update_fetch_positions(timeout_ms=timer.timeout_ms)
Expand Down
48 changes: 48 additions & 0 deletions test/test_consumer.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from unittest.mock import patch, MagicMock
import pytest

from kafka import KafkaConsumer, TopicPartition
Expand Down Expand Up @@ -48,3 +49,50 @@ def test_assign():
consumer.subscribe(topics=['foo'])
consumer.assign([])
assert consumer.assignment() == set()


def test_poll_timeout_zero_returns_buffered_records():
"""Test that poll(timeout_ms=0) returns records already in the fetch buffer.

Regression test for https://github.com/dpkp/kafka-python/issues/2692
Buffered records are checked before coordinator.poll(), so they are
returned regardless of coordinator state (rebalance, timeout, etc.).
"""
consumer = KafkaConsumer(api_version=(0, 10, 0))
tp = TopicPartition('test-topic', 0)
consumer.assign([tp])

mock_records = {tp: [MagicMock()]}

# Fetcher already has records buffered from a previous network poll.
# coordinator.poll() should never be reached in this path.
with patch.object(consumer._fetcher, 'fetched_records', return_value=(mock_records, False)), \
patch.object(consumer._fetcher, 'send_fetches', return_value=[]), \
patch.object(consumer._client, 'poll'), \
patch.object(consumer._coordinator, 'poll', return_value=False) as mock_coord_poll:

result = consumer.poll(timeout_ms=0)
assert result == mock_records, (
"poll(timeout_ms=0) should return buffered records "
"without needing a successful coordinator.poll()"
)
mock_coord_poll.assert_not_called()


def test_poll_timeout_zero_returns_empty_when_no_buffered_records():
"""Test that poll(timeout_ms=0) returns empty when no records are buffered.

Ensures that the non-blocking behavior is preserved -- if there are no
records in the buffer and coordinator times out, return empty immediately.
"""
consumer = KafkaConsumer(api_version=(0, 10, 0))
tp = TopicPartition('test-topic', 0)
consumer.assign([tp])

with patch.object(consumer._fetcher, 'fetched_records', return_value=({}, False)), \
patch.object(consumer._coordinator, 'poll', return_value=False):

result = consumer.poll(timeout_ms=0)
assert result == {}, (
"poll(timeout_ms=0) should return empty dict when no records are buffered"
)
Loading